]> source.dussan.org Git - sonarqube.git/commitdiff
SONAR-9040 Implement a lock mechanism for cleaning jobs
authorEric Hartmann <hartmann.eric@gmail.com>
Fri, 14 Apr 2017 15:20:26 +0000 (17:20 +0200)
committerEric Hartmann <hartmann.eric@gmail.Com>
Thu, 27 Apr 2017 07:23:18 +0000 (09:23 +0200)
SONAR-8986 Add guava dependency

server/sonar-ce/src/main/java/org/sonar/ce/CeDistributedInformation.java
server/sonar-ce/src/main/java/org/sonar/ce/CeDistributedInformationImpl.java
server/sonar-ce/src/main/java/org/sonar/ce/StandaloneCeDistributedInformation.java
server/sonar-ce/src/main/java/org/sonar/ce/cleaning/CeCleaningSchedulerImpl.java
server/sonar-ce/src/test/java/org/sonar/ce/StandaloneCeDistributedInformationTest.java
server/sonar-ce/src/test/java/org/sonar/ce/cleaning/CeCleaningSchedulerImplTest.java
server/sonar-process/src/main/java/org/sonar/process/cluster/ClusterObjectKeys.java

index a989272d2158755801f47aa560d4aff82f6d8da9..a201382bf947f5e3342f2622968224f2e32d0de2 100644 (file)
@@ -21,6 +21,7 @@
 package org.sonar.ce;
 
 import java.util.Set;
+import java.util.concurrent.locks.Lock;
 
 /**
  * CeDistributedInformation is the interface to be implemented in order
@@ -34,4 +35,9 @@ public interface CeDistributedInformation {
    * are up so that they are shared with other Compute Engine nodes
    */
   void broadcastWorkerUUIDs();
+
+  /**
+   * Acquire a lock among all the Compute Engines
+   */
+  Lock acquireCleanJobLock();
 }
index a94b9d876810e75792f8a60084d76fa817d86c3b..cfb52e7cc95c72f241e734798da918b0ffe6270f 100644 (file)
@@ -22,9 +22,11 @@ package org.sonar.ce;
 
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.locks.Lock;
 import org.picocontainer.Startable;
 import org.sonar.ce.cluster.HazelcastClientWrapper;
 import org.sonar.ce.taskprocessor.CeWorkerFactory;
+import org.sonar.process.cluster.ClusterObjectKeys;
 
 import static org.sonar.core.util.stream.MoreCollectors.toSet;
 import static org.sonar.process.cluster.ClusterObjectKeys.WORKER_UUIDS;
@@ -57,6 +59,11 @@ public class CeDistributedInformationImpl implements CeDistributedInformation, S
     getClusteredWorkerUUIDs().put(hazelcastClientWrapper.getClientUUID(), ceCeWorkerFactory.getWorkerUUIDs());
   }
 
+  @Override
+  public Lock acquireCleanJobLock() {
+    return hazelcastClientWrapper.getLock(ClusterObjectKeys.CE_CLEANING_JOB_LOCK);
+  }
+
   @Override
   public void start() {
     // Nothing to do here
index da83705d7fc39ea1026449876510c42f3e373438..7fcbd2b842a8ced950e606e4a8872b3d7e5e0806 100644 (file)
@@ -21,6 +21,9 @@
 package org.sonar.ce;
 
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
 import org.sonar.ce.taskprocessor.CeWorkerFactory;
 
 import static com.google.common.base.Preconditions.checkState;
@@ -32,6 +35,8 @@ public class StandaloneCeDistributedInformation implements CeDistributedInformat
   private final CeWorkerFactory ceCeWorkerFactory;
   private Set<String> workerUUIDs;
 
+  private Lock cleanJobLock = new NonConcurrentLock();
+
   public StandaloneCeDistributedInformation(CeWorkerFactory ceCeWorkerFactory) {
     this.ceCeWorkerFactory = ceCeWorkerFactory;
   }
@@ -46,4 +51,47 @@ public class StandaloneCeDistributedInformation implements CeDistributedInformat
   public void broadcastWorkerUUIDs() {
     workerUUIDs = ceCeWorkerFactory.getWorkerUUIDs();
   }
+
+  /**
+   * Since StandaloneCeDistributedInformation in fact does not provide any distribution support, the lock returned by
+   * this method provides no concurrency support at all.
+   */
+  @Override
+  public Lock acquireCleanJobLock() {
+    return cleanJobLock;
+  }
+
+  private static class NonConcurrentLock implements Lock {
+    @Override
+    public void lock() {
+      // return immediately and never block
+    }
+
+    @Override
+    public void lockInterruptibly() throws InterruptedException {
+      // return immediately and never block
+    }
+
+    @Override
+    public boolean tryLock() {
+      // always succeed
+      return true;
+    }
+
+    @Override
+    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
+      // always succeed
+      return true;
+    }
+
+    @Override
+    public void unlock() {
+      // nothing to do
+    }
+
+    @Override
+    public Condition newCondition() {
+      throw new UnsupportedOperationException("newCondition not supported");
+    }
+  }
 }
index 1fdb2c5a236d27a8f95fd0a6735780ccd6eb9795..8c3e811eb67080ad725bf5756bf0c9a64c40e8db 100644 (file)
@@ -19,6 +19,7 @@
  */
 package org.sonar.ce.cleaning;
 
+import java.util.concurrent.locks.Lock;
 import org.sonar.api.utils.log.Logger;
 import org.sonar.api.utils.log.Loggers;
 import org.sonar.ce.CeDistributedInformation;
@@ -52,8 +53,18 @@ public class CeCleaningSchedulerImpl implements CeCleaningScheduler {
   }
 
   private void cleanCeQueue() {
-    cancelWornOuts();
-    resetTasksWithUnknownWorkerUUIDs();
+    Lock ceCleaningJobLock = ceDistributedInformation.acquireCleanJobLock();
+
+    // If we cannot lock that means that another job is running
+    // So we skip the cancelWornOuts() method
+    if (ceCleaningJobLock.tryLock()) {
+      try {
+        cancelWornOuts();
+        resetTasksWithUnknownWorkerUUIDs();
+      } finally {
+        ceCleaningJobLock.unlock();
+      }
+    }
   }
 
   private void cancelWornOuts() {
index 1585bcce051b96e3b1799ff06514c667305e705b..83383eb07f93f5277c7febec8a9111d7a13080df 100644 (file)
 package org.sonar.ce;
 
 import com.google.common.collect.ImmutableSet;
+import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.stream.IntStream;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 import org.sonar.ce.taskprocessor.CeWorkerFactory;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -67,4 +72,30 @@ public class StandaloneCeDistributedInformationTest {
 
     ceCluster.getWorkerUUIDs();
   }
+
+  @Test
+  public void acquireCleanJobLock_returns_a_non_current_lock() {
+    StandaloneCeDistributedInformation underTest = new StandaloneCeDistributedInformation(mock(CeWorkerFactory.class));
+
+    Lock lock = underTest.acquireCleanJobLock();
+
+    IntStream.range(0, 5 + Math.abs(new Random().nextInt(50)))
+      .forEach(i -> {
+        try {
+          assertThat(lock.tryLock()).isTrue();
+          assertThat(lock.tryLock(1, TimeUnit.MINUTES)).isTrue();
+          lock.lock();
+          lock.lockInterruptibly();
+          lock.unlock();
+        } catch (InterruptedException e) {
+          fail("no InterruptedException should be thrown");
+        }
+        try {
+          lock.newCondition();
+          fail("a UnsupportedOperationException should have been thrown");
+        } catch (UnsupportedOperationException e) {
+          assertThat(e.getMessage()).isEqualTo("newCondition not supported");
+        }
+      });
+  }
 }
index bf91352b379d413263ecb7271204c3e741be81eb..f9169ffbba9a7a76e38e9eeed13af94e106776ec 100644 (file)
@@ -27,6 +27,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.Lock;
 import org.junit.Test;
 import org.sonar.ce.CeDistributedInformation;
 import org.sonar.ce.configuration.CeConfiguration;
@@ -37,21 +38,19 @@ import static org.assertj.core.api.Assertions.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class CeCleaningSchedulerImplTest {
+
+  private Lock jobLock = mock(Lock.class);
+
   @Test
   public void startScheduling_does_not_fail_if_cleaning_methods_send_even_an_Exception() {
     InternalCeQueue mockedInternalCeQueue = mock(InternalCeQueue.class);
-    CeCleaningSchedulerImpl underTest = new CeCleaningSchedulerImpl(new CeCleaningAdapter() {
-      @Override
-      public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
-        // synchronously execute command
-        command.run();
-        return null;
-      }
-    }, mockCeConfiguration(1, 10), mockedInternalCeQueue, mock(CeDistributedInformation.class));
+    CeDistributedInformation mockedCeDistributedInformation = mockCeDistributedInformation(jobLock);
+    CeCleaningSchedulerImpl underTest = mockCeCleaningSchedulerImpl(mockedInternalCeQueue, mockedCeDistributedInformation);
     Exception exception = new IllegalArgumentException("faking unchecked exception thrown by cancelWornOuts");
     doThrow(exception).when(mockedInternalCeQueue).cancelWornOuts();
     doThrow(exception).when(mockedInternalCeQueue).resetTasksWithUnknownWorkerUUIDs(any());
@@ -65,14 +64,8 @@ public class CeCleaningSchedulerImplTest {
   @Test
   public void startScheduling_fails_if_cancelWornOuts_send_an_Error() {
     InternalCeQueue mockedInternalCeQueue = mock(InternalCeQueue.class);
-    CeCleaningSchedulerImpl underTest = new CeCleaningSchedulerImpl(new CeCleaningAdapter() {
-      @Override
-      public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
-        // synchronously execute command
-        command.run();
-        return null;
-      }
-    }, mockCeConfiguration(1, 10), mockedInternalCeQueue, mock(CeDistributedInformation.class));
+    CeDistributedInformation mockedCeDistributedInformation = mockCeDistributedInformation(jobLock);
+    CeCleaningSchedulerImpl underTest = mockCeCleaningSchedulerImpl(mockedInternalCeQueue, mockedCeDistributedInformation);
     Error expected = new Error("faking Error thrown by cancelWornOuts");
     doThrow(expected).when(mockedInternalCeQueue).cancelWornOuts();
 
@@ -88,14 +81,8 @@ public class CeCleaningSchedulerImplTest {
   @Test
   public void startScheduling_fails_if_resetTasksWithUnknownWorkerUUIDs_send_an_Error() {
     InternalCeQueue mockedInternalCeQueue = mock(InternalCeQueue.class);
-    CeCleaningSchedulerImpl underTest = new CeCleaningSchedulerImpl(new CeCleaningAdapter() {
-      @Override
-      public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
-        // synchronously execute command
-        command.run();
-        return null;
-      }
-    }, mockCeConfiguration(1, 10), mockedInternalCeQueue, mock(CeDistributedInformation.class));
+    CeDistributedInformation mockedCeDistributedInformation = mockCeDistributedInformation(jobLock);
+    CeCleaningSchedulerImpl underTest = mockCeCleaningSchedulerImpl(mockedInternalCeQueue, mockedCeDistributedInformation);
     Error expected = new Error("faking Error thrown by cancelWornOuts");
     doThrow(expected).when(mockedInternalCeQueue).resetTasksWithUnknownWorkerUUIDs(any());
 
@@ -108,6 +95,36 @@ public class CeCleaningSchedulerImplTest {
     verify(mockedInternalCeQueue).resetTasksWithUnknownWorkerUUIDs(any());
   }
 
+  @Test
+  public void startScheduling_must_call_the_lock_methods() {
+    InternalCeQueue mockedInternalCeQueue = mock(InternalCeQueue.class);
+    CeDistributedInformation mockedCeDistributedInformation = mockCeDistributedInformation(jobLock);
+    CeCleaningSchedulerImpl underTest = mockCeCleaningSchedulerImpl(mockedInternalCeQueue, mockedCeDistributedInformation);
+    underTest.startScheduling();
+
+    verify(mockedCeDistributedInformation, times(1)).acquireCleanJobLock();
+    verify(jobLock, times(1)).tryLock();
+    verify(jobLock, times(1)).unlock();
+  }
+
+  @Test
+  public void startScheduling_must_not_execute_method_if_lock_is_already_acquired() {
+    InternalCeQueue mockedInternalCeQueue = mock(InternalCeQueue.class);
+    CeDistributedInformation mockedCeDistributedInformation = mockCeDistributedInformation(jobLock);
+    when(jobLock.tryLock()).thenReturn(false);
+
+    CeCleaningSchedulerImpl underTest = mockCeCleaningSchedulerImpl(mockedInternalCeQueue, mockedCeDistributedInformation);
+    underTest.startScheduling();
+
+    verify(mockedCeDistributedInformation, times(1)).acquireCleanJobLock();
+    verify(jobLock, times(1)).tryLock();
+    // since lock cannot be locked, unlock method is not been called
+    verify(jobLock, times(0)).unlock();
+    // since lock cannot be locked, cleaning job methods must not be called
+    verify(mockedInternalCeQueue, times(0)).resetTasksWithUnknownWorkerUUIDs(any());
+    verify(mockedInternalCeQueue, times(0)).cancelWornOuts();
+  }
+
   @Test
   public void startScheduling_calls_cleaning_methods_of_internalCeQueue_at_fixed_rate_with_value_from_CeConfiguration() {
     InternalCeQueue mockedInternalCeQueue = mock(InternalCeQueue.class);
@@ -139,7 +156,8 @@ public class CeCleaningSchedulerImplTest {
         return null;
       }
     };
-    CeCleaningSchedulerImpl underTest = new CeCleaningSchedulerImpl(executorService, mockedCeConfiguration, mockedInternalCeQueue, mock(CeDistributedInformation.class));
+    CeCleaningSchedulerImpl underTest = new CeCleaningSchedulerImpl(executorService, mockedCeConfiguration,
+      mockedInternalCeQueue, mockCeDistributedInformation(jobLock));
 
     underTest.startScheduling();
     assertThat(executorService.schedulerCounter).isEqualTo(1);
@@ -153,6 +171,24 @@ public class CeCleaningSchedulerImplTest {
     return mockedCeConfiguration;
   }
 
+  private CeCleaningSchedulerImpl mockCeCleaningSchedulerImpl(InternalCeQueue internalCeQueue, CeDistributedInformation ceDistributedInformation) {
+    return new CeCleaningSchedulerImpl(new CeCleaningAdapter() {
+      @Override
+      public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
+        // synchronously execute command
+        command.run();
+        return null;
+      }
+    }, mockCeConfiguration(1, 10), internalCeQueue, ceDistributedInformation);
+  }
+
+  private CeDistributedInformation mockCeDistributedInformation(Lock result) {
+    CeDistributedInformation mocked = mock(CeDistributedInformation.class);
+    when(mocked.acquireCleanJobLock()).thenReturn(result);
+    when(result.tryLock()).thenReturn(true);
+    return mocked;
+  }
+
   /**
    * Implementation of {@link CeCleaningExecutorService} which throws {@link UnsupportedOperationException} for every
    * method.
index a11808d27fd921696c9b65dc0ac01bd7081f85ea..1f7d9be3eade469e7360254fa38b375d55d30dd7 100644 (file)
@@ -53,4 +53,10 @@ public final class ClusterObjectKeys {
    * The key of replicated map holding the CeWorker UUIDs
    */
   public static final String WORKER_UUIDS = "WORKER_UUIDS";
+
+  /**
+   * The key of the lock for executing CE_CLEANING_JOB
+   * {@link CeCleaningSchedulerImpl}
+   */
+  public static final String CE_CLEANING_JOB_LOCK = "CE_CLEANING_JOB_LOCK";
 }