package org.sonar.ce;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
import org.sonar.ce.taskprocessor.CeWorkerFactory;
import static com.google.common.base.Preconditions.checkState;
private final CeWorkerFactory ceCeWorkerFactory;
private Set<String> workerUUIDs;
+ private Lock cleanJobLock = new NonConcurrentLock();
+
public StandaloneCeDistributedInformation(CeWorkerFactory ceCeWorkerFactory) {
this.ceCeWorkerFactory = ceCeWorkerFactory;
}
public void broadcastWorkerUUIDs() {
workerUUIDs = ceCeWorkerFactory.getWorkerUUIDs();
}
+
+ /**
+ * Since StandaloneCeDistributedInformation in fact does not provide any distribution support, the lock returned by
+ * this method provides no concurrency support at all.
+ */
+ @Override
+ public Lock acquireCleanJobLock() {
+ return cleanJobLock;
+ }
+
+ private static class NonConcurrentLock implements Lock {
+ @Override
+ public void lock() {
+ // return immediately and never block
+ }
+
+ @Override
+ public void lockInterruptibly() throws InterruptedException {
+ // return immediately and never block
+ }
+
+ @Override
+ public boolean tryLock() {
+ // always succeed
+ return true;
+ }
+
+ @Override
+ public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
+ // always succeed
+ return true;
+ }
+
+ @Override
+ public void unlock() {
+ // nothing to do
+ }
+
+ @Override
+ public Condition newCondition() {
+ throw new UnsupportedOperationException("newCondition not supported");
+ }
+ }
}
package org.sonar.ce;
import com.google.common.collect.ImmutableSet;
+import java.util.Random;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.stream.IntStream;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.sonar.ce.taskprocessor.CeWorkerFactory;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
ceCluster.getWorkerUUIDs();
}
+
+ @Test
+ public void acquireCleanJobLock_returns_a_non_current_lock() {
+ StandaloneCeDistributedInformation underTest = new StandaloneCeDistributedInformation(mock(CeWorkerFactory.class));
+
+ Lock lock = underTest.acquireCleanJobLock();
+
+ IntStream.range(0, 5 + Math.abs(new Random().nextInt(50)))
+ .forEach(i -> {
+ try {
+ assertThat(lock.tryLock()).isTrue();
+ assertThat(lock.tryLock(1, TimeUnit.MINUTES)).isTrue();
+ lock.lock();
+ lock.lockInterruptibly();
+ lock.unlock();
+ } catch (InterruptedException e) {
+ fail("no InterruptedException should be thrown");
+ }
+ try {
+ lock.newCondition();
+ fail("a UnsupportedOperationException should have been thrown");
+ } catch (UnsupportedOperationException e) {
+ assertThat(e.getMessage()).isEqualTo("newCondition not supported");
+ }
+ });
+ }
}
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.Lock;
import org.junit.Test;
import org.sonar.ce.CeDistributedInformation;
import org.sonar.ce.configuration.CeConfiguration;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class CeCleaningSchedulerImplTest {
+
+ private Lock jobLock = mock(Lock.class);
+
@Test
public void startScheduling_does_not_fail_if_cleaning_methods_send_even_an_Exception() {
InternalCeQueue mockedInternalCeQueue = mock(InternalCeQueue.class);
- CeCleaningSchedulerImpl underTest = new CeCleaningSchedulerImpl(new CeCleaningAdapter() {
- @Override
- public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
- // synchronously execute command
- command.run();
- return null;
- }
- }, mockCeConfiguration(1, 10), mockedInternalCeQueue, mock(CeDistributedInformation.class));
+ CeDistributedInformation mockedCeDistributedInformation = mockCeDistributedInformation(jobLock);
+ CeCleaningSchedulerImpl underTest = mockCeCleaningSchedulerImpl(mockedInternalCeQueue, mockedCeDistributedInformation);
Exception exception = new IllegalArgumentException("faking unchecked exception thrown by cancelWornOuts");
doThrow(exception).when(mockedInternalCeQueue).cancelWornOuts();
doThrow(exception).when(mockedInternalCeQueue).resetTasksWithUnknownWorkerUUIDs(any());
@Test
public void startScheduling_fails_if_cancelWornOuts_send_an_Error() {
InternalCeQueue mockedInternalCeQueue = mock(InternalCeQueue.class);
- CeCleaningSchedulerImpl underTest = new CeCleaningSchedulerImpl(new CeCleaningAdapter() {
- @Override
- public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
- // synchronously execute command
- command.run();
- return null;
- }
- }, mockCeConfiguration(1, 10), mockedInternalCeQueue, mock(CeDistributedInformation.class));
+ CeDistributedInformation mockedCeDistributedInformation = mockCeDistributedInformation(jobLock);
+ CeCleaningSchedulerImpl underTest = mockCeCleaningSchedulerImpl(mockedInternalCeQueue, mockedCeDistributedInformation);
Error expected = new Error("faking Error thrown by cancelWornOuts");
doThrow(expected).when(mockedInternalCeQueue).cancelWornOuts();
@Test
public void startScheduling_fails_if_resetTasksWithUnknownWorkerUUIDs_send_an_Error() {
InternalCeQueue mockedInternalCeQueue = mock(InternalCeQueue.class);
- CeCleaningSchedulerImpl underTest = new CeCleaningSchedulerImpl(new CeCleaningAdapter() {
- @Override
- public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
- // synchronously execute command
- command.run();
- return null;
- }
- }, mockCeConfiguration(1, 10), mockedInternalCeQueue, mock(CeDistributedInformation.class));
+ CeDistributedInformation mockedCeDistributedInformation = mockCeDistributedInformation(jobLock);
+ CeCleaningSchedulerImpl underTest = mockCeCleaningSchedulerImpl(mockedInternalCeQueue, mockedCeDistributedInformation);
Error expected = new Error("faking Error thrown by cancelWornOuts");
doThrow(expected).when(mockedInternalCeQueue).resetTasksWithUnknownWorkerUUIDs(any());
verify(mockedInternalCeQueue).resetTasksWithUnknownWorkerUUIDs(any());
}
+ @Test
+ public void startScheduling_must_call_the_lock_methods() {
+ InternalCeQueue mockedInternalCeQueue = mock(InternalCeQueue.class);
+ CeDistributedInformation mockedCeDistributedInformation = mockCeDistributedInformation(jobLock);
+ CeCleaningSchedulerImpl underTest = mockCeCleaningSchedulerImpl(mockedInternalCeQueue, mockedCeDistributedInformation);
+ underTest.startScheduling();
+
+ verify(mockedCeDistributedInformation, times(1)).acquireCleanJobLock();
+ verify(jobLock, times(1)).tryLock();
+ verify(jobLock, times(1)).unlock();
+ }
+
+ @Test
+ public void startScheduling_must_not_execute_method_if_lock_is_already_acquired() {
+ InternalCeQueue mockedInternalCeQueue = mock(InternalCeQueue.class);
+ CeDistributedInformation mockedCeDistributedInformation = mockCeDistributedInformation(jobLock);
+ when(jobLock.tryLock()).thenReturn(false);
+
+ CeCleaningSchedulerImpl underTest = mockCeCleaningSchedulerImpl(mockedInternalCeQueue, mockedCeDistributedInformation);
+ underTest.startScheduling();
+
+ verify(mockedCeDistributedInformation, times(1)).acquireCleanJobLock();
+ verify(jobLock, times(1)).tryLock();
+ // since lock cannot be locked, unlock method is not been called
+ verify(jobLock, times(0)).unlock();
+ // since lock cannot be locked, cleaning job methods must not be called
+ verify(mockedInternalCeQueue, times(0)).resetTasksWithUnknownWorkerUUIDs(any());
+ verify(mockedInternalCeQueue, times(0)).cancelWornOuts();
+ }
+
@Test
public void startScheduling_calls_cleaning_methods_of_internalCeQueue_at_fixed_rate_with_value_from_CeConfiguration() {
InternalCeQueue mockedInternalCeQueue = mock(InternalCeQueue.class);
return null;
}
};
- CeCleaningSchedulerImpl underTest = new CeCleaningSchedulerImpl(executorService, mockedCeConfiguration, mockedInternalCeQueue, mock(CeDistributedInformation.class));
+ CeCleaningSchedulerImpl underTest = new CeCleaningSchedulerImpl(executorService, mockedCeConfiguration,
+ mockedInternalCeQueue, mockCeDistributedInformation(jobLock));
underTest.startScheduling();
assertThat(executorService.schedulerCounter).isEqualTo(1);
return mockedCeConfiguration;
}
+ private CeCleaningSchedulerImpl mockCeCleaningSchedulerImpl(InternalCeQueue internalCeQueue, CeDistributedInformation ceDistributedInformation) {
+ return new CeCleaningSchedulerImpl(new CeCleaningAdapter() {
+ @Override
+ public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
+ // synchronously execute command
+ command.run();
+ return null;
+ }
+ }, mockCeConfiguration(1, 10), internalCeQueue, ceDistributedInformation);
+ }
+
+ private CeDistributedInformation mockCeDistributedInformation(Lock result) {
+ CeDistributedInformation mocked = mock(CeDistributedInformation.class);
+ when(mocked.acquireCleanJobLock()).thenReturn(result);
+ when(result.tryLock()).thenReturn(true);
+ return mocked;
+ }
+
/**
* Implementation of {@link CeCleaningExecutorService} which throws {@link UnsupportedOperationException} for every
* method.