]> source.dussan.org Git - sonarqube.git/commitdiff
SONAR-8986 add purge of tasks of non existing workers to ce clean job
authorEric Hartmann <hartmann.eric@gmail.Com>
Fri, 14 Apr 2017 09:58:04 +0000 (11:58 +0200)
committerEric Hartmann <hartmann.eric@gmail.Com>
Thu, 27 Apr 2017 07:23:18 +0000 (09:23 +0200)
14 files changed:
server/sonar-ce-api/src/main/java/org/sonar/ce/configuration/CeConfiguration.java
server/sonar-ce-api/src/main/java/org/sonar/ce/configuration/CeConfigurationImpl.java
server/sonar-ce-api/src/test/java/org/sonar/ce/configuration/CeConfigurationImplTest.java
server/sonar-ce/src/main/java/org/sonar/ce/cleaning/CeCleaningSchedulerImpl.java
server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueue.java
server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java
server/sonar-ce/src/test/java/org/sonar/ce/cleaning/CeCleaningSchedulerImplTest.java
server/sonar-ce/src/test/java/org/sonar/ce/configuration/CeConfigurationRule.java
server/sonar-ce/src/test/java/org/sonar/ce/monitoring/CeTasksMBeanImplTest.java
server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java
server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueDao.java
server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueMapper.java
server/sonar-db-dao/src/main/resources/org/sonar/db/ce/CeQueueMapper.xml
server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeQueueDaoTest.java

index dd27ba3f88209397234ce78f96a57cc779bce86d..393ec1d6f52ed27ae5bdd54132d358bd445bb989 100644 (file)
@@ -27,18 +27,19 @@ public interface CeConfiguration {
   int getWorkerCount();
 
   /**
-   * The delay in millisecond before a {@link CeWorker} shall try and find a task
+   * The delay in millisecond before a {@link org.sonar.ce.taskprocessor.CeWorker} shall try and find a task
    * to process when it's previous execution had nothing to do.
    */
   long getQueuePollingDelay();
 
   /**
-   * Delay before running job that cancels worn out tasks for the first time (in minutes).
+   * Delay before running job that cleans CE tasks for the first time (in minutes).
    */
-  long getCancelWornOutsInitialDelay();
+  long getCleanCeTasksInitialDelay();
 
   /**
-   * Delay between the end of a run and the start of the next one of the job that cancels worn out CE tasks (in minutes).
+   * Delay between the end of a run and the start of the next one of the job that cleans CE tasks (in minutes).
    */
-  long getCancelWornOutsDelay();
+  long getCleanCeTasksDelay();
+
 }
index 086cd79708c43d89aba8c26bd00826dddfa09967..77e6f78be49b108c4b4013445be4c68e26b18301 100644 (file)
@@ -99,12 +99,13 @@ public class CeConfigurationImpl implements CeConfiguration, Startable {
   }
 
   @Override
-  public long getCancelWornOutsInitialDelay() {
+  public long getCleanCeTasksInitialDelay() {
     return CANCEL_WORN_OUTS_INITIAL_DELAY;
   }
 
   @Override
-  public long getCancelWornOutsDelay() {
+  public long getCleanCeTasksDelay() {
     return CANCEL_WORN_OUTS_DELAY;
   }
+
 }
index 9d7f3f0b6db7b2909684a9eb7c7bedec88c5d4ca..11a80512919dc6e338339c64049b1abd8f9981fa 100644 (file)
@@ -111,14 +111,14 @@ public class CeConfigurationImplTest {
   }
 
   @Test
-  public void getCancelWornOutsInitialDelay_returns_1() {
-    assertThat(new CeConfigurationImpl(settings).getCancelWornOutsInitialDelay())
+  public void getCleanCeTasksInitialDelay_returns_1() {
+    assertThat(new CeConfigurationImpl(settings).getCleanCeTasksInitialDelay())
       .isEqualTo(1L);
   }
 
   @Test
-  public void getCancelWornOutsDelay_returns_10() {
-    assertThat(new CeConfigurationImpl(settings).getCancelWornOutsDelay())
+  public void getCleanCeTasksDelay_returns_10() {
+    assertThat(new CeConfigurationImpl(settings).getCleanCeTasksDelay())
       .isEqualTo(10L);
   }
 }
index 82675c693d62b8ba3d4da51840c4ba203e06e46a..1fdb2c5a236d27a8f95fd0a6735780ccd6eb9795 100644 (file)
@@ -21,6 +21,7 @@ package org.sonar.ce.cleaning;
 
 import org.sonar.api.utils.log.Logger;
 import org.sonar.api.utils.log.Loggers;
+import org.sonar.ce.CeDistributedInformation;
 import org.sonar.ce.configuration.CeConfiguration;
 import org.sonar.ce.queue.InternalCeQueue;
 
@@ -32,27 +33,44 @@ public class CeCleaningSchedulerImpl implements CeCleaningScheduler {
   private final CeCleaningExecutorService executorService;
   private final CeConfiguration ceConfiguration;
   private final InternalCeQueue internalCeQueue;
+  private final CeDistributedInformation ceDistributedInformation;
 
-  public CeCleaningSchedulerImpl(CeCleaningExecutorService executorService, CeConfiguration ceConfiguration, InternalCeQueue internalCeQueue) {
+  public CeCleaningSchedulerImpl(CeCleaningExecutorService executorService, CeConfiguration ceConfiguration,
+    InternalCeQueue internalCeQueue, CeDistributedInformation ceDistributedInformation) {
     this.executorService = executorService;
     this.internalCeQueue = internalCeQueue;
     this.ceConfiguration = ceConfiguration;
+    this.ceDistributedInformation = ceDistributedInformation;
   }
 
   @Override
   public void startScheduling() {
-    executorService.scheduleWithFixedDelay(this::cancelWornOuts,
-      ceConfiguration.getCancelWornOutsInitialDelay(),
-      ceConfiguration.getCancelWornOutsDelay(),
+    executorService.scheduleWithFixedDelay(this::cleanCeQueue,
+      ceConfiguration.getCleanCeTasksInitialDelay(),
+      ceConfiguration.getCleanCeTasksDelay(),
       MINUTES);
   }
 
+  private void cleanCeQueue() {
+    cancelWornOuts();
+    resetTasksWithUnknownWorkerUUIDs();
+  }
+
   private void cancelWornOuts() {
     try {
-      LOG.info("Deleting any worn out task");
+      LOG.debug("Deleting any worn out task");
       internalCeQueue.cancelWornOuts();
     } catch (Exception e) {
       LOG.warn("Failed to cancel worn out tasks", e);
     }
   }
+
+  private void resetTasksWithUnknownWorkerUUIDs() {
+    try {
+      LOG.debug("Resetting state of tasks with unknown worker UUIDs");
+      internalCeQueue.resetTasksWithUnknownWorkerUUIDs(ceDistributedInformation.getWorkerUUIDs());
+    } catch (Exception e) {
+      LOG.warn("Failed to reset tasks with unknown worker UUIDs", e);
+    }
+  }
 }
index 42447633920b4ca546753cc66ff2e829d713444c..9cdc5243de9a3b95c072429008aadd4fd07dadc9 100644 (file)
 package org.sonar.ce.queue;
 
 import java.util.Optional;
+import java.util.Set;
 import javax.annotation.Nullable;
-import org.sonar.ce.queue.CeQueue;
-import org.sonar.ce.queue.CeTask;
-import org.sonar.ce.queue.CeTaskResult;
 import org.sonar.db.ce.CeActivityDto.Status;
 
 /**
@@ -70,6 +68,8 @@ public interface InternalCeQueue extends CeQueue {
 
   void cancelWornOuts();
 
+  void resetTasksWithUnknownWorkerUUIDs(Set<String> knownWorkerUUIDs);
+
   void pausePeek();
 
   void resumePeek();
index 8e5bc19bf8fba80bdcf07d191d19ba97c20698b5..8a2009dc25bb82a4a23d1cf3855da9fa187f1767 100644 (file)
@@ -25,6 +25,7 @@ import java.io.OutputStream;
 import java.io.PrintWriter;
 import java.util.List;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import javax.annotation.CheckForNull;
 import javax.annotation.Nullable;
@@ -86,7 +87,6 @@ public class InternalCeQueueImpl extends CeQueueImpl implements InternalCeQueue
         queueStatus.addInProgress();
       }
       return Optional.ofNullable(task);
-
     }
   }
 
@@ -172,6 +172,14 @@ public class InternalCeQueueImpl extends CeQueueImpl implements InternalCeQueue
     }
   }
 
+  @Override
+  public void resetTasksWithUnknownWorkerUUIDs(Set<String> knownWorkerUUIDs) {
+    try (DbSession dbSession = dbClient.openSession(false)) {
+      dbClient.ceQueueDao().resetTasksWithUnknownWorkerUUIDs(dbSession, knownWorkerUUIDs);
+      dbSession.commit();
+    }
+  }
+
   @Override
   public void pausePeek() {
     this.peekPaused.set(true);
index 5449865fc7e3b4a295f7bbe4f603a8c63170372b..bf91352b379d413263ecb7271204c3e741be81eb 100644 (file)
@@ -28,11 +28,13 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import org.junit.Test;
+import org.sonar.ce.CeDistributedInformation;
 import org.sonar.ce.configuration.CeConfiguration;
 import org.sonar.ce.queue.InternalCeQueue;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.fail;
+import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
@@ -40,7 +42,7 @@ import static org.mockito.Mockito.when;
 
 public class CeCleaningSchedulerImplTest {
   @Test
-  public void startScheduling_does_not_fail_if_cancelWornOuts_send_even_an_Exception() {
+  public void startScheduling_does_not_fail_if_cleaning_methods_send_even_an_Exception() {
     InternalCeQueue mockedInternalCeQueue = mock(InternalCeQueue.class);
     CeCleaningSchedulerImpl underTest = new CeCleaningSchedulerImpl(new CeCleaningAdapter() {
       @Override
@@ -49,16 +51,19 @@ public class CeCleaningSchedulerImplTest {
         command.run();
         return null;
       }
-    }, mockCeConfiguration(1, 10), mockedInternalCeQueue);
-    doThrow(new IllegalArgumentException("faking unchecked exception thrown by cancelWornOuts")).when(mockedInternalCeQueue).cancelWornOuts();
+    }, mockCeConfiguration(1, 10), mockedInternalCeQueue, mock(CeDistributedInformation.class));
+    Exception exception = new IllegalArgumentException("faking unchecked exception thrown by cancelWornOuts");
+    doThrow(exception).when(mockedInternalCeQueue).cancelWornOuts();
+    doThrow(exception).when(mockedInternalCeQueue).resetTasksWithUnknownWorkerUUIDs(any());
 
     underTest.startScheduling();
 
     verify(mockedInternalCeQueue).cancelWornOuts();
+    verify(mockedInternalCeQueue).resetTasksWithUnknownWorkerUUIDs(any());
   }
 
   @Test
-  public void startScheduling_fails_if_cancelWornOuts_send_even_an_Error() {
+  public void startScheduling_fails_if_cancelWornOuts_send_an_Error() {
     InternalCeQueue mockedInternalCeQueue = mock(InternalCeQueue.class);
     CeCleaningSchedulerImpl underTest = new CeCleaningSchedulerImpl(new CeCleaningAdapter() {
       @Override
@@ -67,7 +72,7 @@ public class CeCleaningSchedulerImplTest {
         command.run();
         return null;
       }
-    }, mockCeConfiguration(1, 10), mockedInternalCeQueue);
+    }, mockCeConfiguration(1, 10), mockedInternalCeQueue, mock(CeDistributedInformation.class));
     Error expected = new Error("faking Error thrown by cancelWornOuts");
     doThrow(expected).when(mockedInternalCeQueue).cancelWornOuts();
 
@@ -77,36 +82,74 @@ public class CeCleaningSchedulerImplTest {
     } catch (Error e) {
       assertThat(e).isSameAs(expected);
     }
+    verify(mockedInternalCeQueue).cancelWornOuts();
   }
 
   @Test
-  public void startScheduling_calls_cancelWornOuts_of_internalCeQueue_at_fixed_rate_with_value_from_CeConfiguration() {
+  public void startScheduling_fails_if_resetTasksWithUnknownWorkerUUIDs_send_an_Error() {
     InternalCeQueue mockedInternalCeQueue = mock(InternalCeQueue.class);
-    long initialDelay = 10L;
-    long delay = 20L;
-    CeConfiguration mockedCeConfiguration = mockCeConfiguration(initialDelay, delay);
+    CeCleaningSchedulerImpl underTest = new CeCleaningSchedulerImpl(new CeCleaningAdapter() {
+      @Override
+      public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
+        // synchronously execute command
+        command.run();
+        return null;
+      }
+    }, mockCeConfiguration(1, 10), mockedInternalCeQueue, mock(CeDistributedInformation.class));
+    Error expected = new Error("faking Error thrown by cancelWornOuts");
+    doThrow(expected).when(mockedInternalCeQueue).resetTasksWithUnknownWorkerUUIDs(any());
+
+    try {
+      underTest.startScheduling();
+      fail("the error should have been thrown");
+    } catch (Error e) {
+      assertThat(e).isSameAs(expected);
+    }
+    verify(mockedInternalCeQueue).resetTasksWithUnknownWorkerUUIDs(any());
+  }
+
+  @Test
+  public void startScheduling_calls_cleaning_methods_of_internalCeQueue_at_fixed_rate_with_value_from_CeConfiguration() {
+    InternalCeQueue mockedInternalCeQueue = mock(InternalCeQueue.class);
+    long wornOutInitialDelay = 10L;
+    long wornOutDelay = 20L;
+    long unknownWorkerInitialDelay = 11L;
+    long unknownWorkerDelay = 21L;
+    CeConfiguration mockedCeConfiguration = mockCeConfiguration(wornOutInitialDelay, wornOutDelay);
     CeCleaningAdapter executorService = new CeCleaningAdapter() {
       @Override
       public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initDelay, long period, TimeUnit unit) {
-        assertThat(initDelay).isEqualTo(initialDelay);
-        assertThat(period).isEqualTo(delay);
-        assertThat(unit).isEqualTo(TimeUnit.MINUTES);
+        schedulerCounter++;
+        switch(schedulerCounter) {
+          case 1:
+            assertThat(initDelay).isEqualTo(wornOutInitialDelay);
+            assertThat(period).isEqualTo(wornOutDelay);
+            assertThat(unit).isEqualTo(TimeUnit.MINUTES);
+            break;
+          case 2:
+            assertThat(initDelay).isEqualTo(unknownWorkerInitialDelay);
+            assertThat(period).isEqualTo(unknownWorkerDelay);
+            assertThat(unit).isEqualTo(TimeUnit.MINUTES);
+            break;
+          default:
+            fail("Unknwon call of scheduleWithFixedDelay");
+        }
         // synchronously execute command
         command.run();
         return null;
       }
     };
-    CeCleaningSchedulerImpl underTest = new CeCleaningSchedulerImpl(executorService, mockedCeConfiguration, mockedInternalCeQueue);
+    CeCleaningSchedulerImpl underTest = new CeCleaningSchedulerImpl(executorService, mockedCeConfiguration, mockedInternalCeQueue, mock(CeDistributedInformation.class));
 
     underTest.startScheduling();
-
+    assertThat(executorService.schedulerCounter).isEqualTo(1);
     verify(mockedInternalCeQueue).cancelWornOuts();
   }
 
-  private CeConfiguration mockCeConfiguration(long initialDelay, long delay) {
+  private CeConfiguration mockCeConfiguration(long cleanCeTasksInitialDelay, long cleanCeTasksDelay) {
     CeConfiguration mockedCeConfiguration = mock(CeConfiguration.class);
-    when(mockedCeConfiguration.getCancelWornOutsInitialDelay()).thenReturn(initialDelay);
-    when(mockedCeConfiguration.getCancelWornOutsDelay()).thenReturn(delay);
+    when(mockedCeConfiguration.getCleanCeTasksInitialDelay()).thenReturn(cleanCeTasksInitialDelay);
+    when(mockedCeConfiguration.getCleanCeTasksDelay()).thenReturn(cleanCeTasksDelay);
     return mockedCeConfiguration;
   }
 
@@ -115,6 +158,7 @@ public class CeCleaningSchedulerImplTest {
    * method.
    */
   private static class CeCleaningAdapter implements CeCleaningExecutorService {
+    protected int schedulerCounter = 0;
 
     @Override
     public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
index 1c69bb40c25d286f989211f8bd3fafd1b5790c1e..61e22fbbdc8d286f964f80a2f1d7b35900a71bd0 100644 (file)
@@ -54,21 +54,21 @@ public class CeConfigurationRule extends ExternalResource implements CeConfigura
   }
 
   @Override
-  public long getCancelWornOutsInitialDelay() {
+  public long getCleanCeTasksInitialDelay() {
     return cancelWornOutsInitialDelay;
   }
 
-  public void setCancelWornOutsInitialDelay(long cancelWornOutsInitialDelay) {
+  public void setCleanCeTasksInitialDelay(long cancelWornOutsInitialDelay) {
     checkArgument(cancelWornOutsInitialDelay > 0, "cancel worn-outs polling initial delay must be >= 1");
     this.cancelWornOutsInitialDelay = cancelWornOutsInitialDelay;
   }
 
   @Override
-  public long getCancelWornOutsDelay() {
+  public long getCleanCeTasksDelay() {
     return cancelWornOutsDelay;
   }
 
-  public void setCancelWornOutsDelay(long cancelWornOutsDelay) {
+  public void setCleanCeTasksDelay(long cancelWornOutsDelay) {
     checkArgument(cancelWornOutsDelay > 0, "cancel worn-outs polling delay must be >= 1");
     this.cancelWornOutsDelay = cancelWornOutsDelay;
   }
index 1ec6f93211578b3601e1406e6885d05a8b55856b..0fd5c4de753e909b54076d1cdaff99f0d11c2b05 100644 (file)
@@ -135,14 +135,15 @@ public class CeTasksMBeanImplTest {
     }
 
     @Override
-    public long getCancelWornOutsInitialDelay() {
-      throw new UnsupportedOperationException("getCancelWornOutsInitialDelay is not implemented");
+    public long getCleanCeTasksInitialDelay() {
+      throw new UnsupportedOperationException("getCleanCeTasksInitialDelay is not implemented");
     }
 
     @Override
-    public long getCancelWornOutsDelay() {
-      throw new UnsupportedOperationException("getCancelWornOutsDelay is not implemented");
+    public long getCleanCeTasksDelay() {
+      throw new UnsupportedOperationException("getCleanCeTasksDelay is not implemented");
     }
+
   }
 
   @CheckForNull
index 93340a397583e7f50505d51cbd9df152ba02fcb0..7162d2967a92c1e228c9ef1e90defb8fde6bdda4 100644 (file)
@@ -19,6 +19,7 @@
  */
 package org.sonar.ce.queue;
 
+import com.google.common.collect.ImmutableSet;
 import java.io.ByteArrayOutputStream;
 import java.io.PrintStream;
 import java.util.List;
@@ -530,17 +531,113 @@ public class InternalCeQueueImplTest {
 
     underTest.cancelWornOuts();
 
-    verifyUnmodifiedByCancelWornOuts(u1);
-    verifyUnmodifiedByCancelWornOuts(u2);
+    verifyUnmodified(u1);
+    verifyUnmodified(u2);
     verifyCanceled(u3);
     verifyCanceled(u4);
-    verifyUnmodifiedByCancelWornOuts(u5);
-    verifyUnmodifiedByCancelWornOuts(u6);
-    verifyUnmodifiedByCancelWornOuts(u7);
-    verifyUnmodifiedByCancelWornOuts(u8);
+    verifyUnmodified(u5);
+    verifyUnmodified(u6);
+    verifyUnmodified(u7);
+    verifyUnmodified(u8);
   }
 
-  private void verifyUnmodifiedByCancelWornOuts(CeQueueDto original) {
+  @Test
+  public void resetTasksWithUnknownWorkerUUIDs_reset_only_in_progress_tasks() {
+    CeQueueDto u1 = insertCeQueueDto("u1", CeQueueDto.Status.PENDING, 0, null);
+    CeQueueDto u2 = insertCeQueueDto("u2", CeQueueDto.Status.PENDING, 1, "worker1");
+    CeQueueDto u3 = insertCeQueueDto("u3", CeQueueDto.Status.PENDING, 2, null);
+    CeQueueDto u4 = insertCeQueueDto("u4", CeQueueDto.Status.PENDING, 3, "worker2");
+    CeQueueDto u5 = insertCeQueueDto("u5", CeQueueDto.Status.IN_PROGRESS, 0, null);
+    CeQueueDto u6 = insertCeQueueDto("u6", CeQueueDto.Status.IN_PROGRESS, 1, "worker1");
+    CeQueueDto u7 = insertCeQueueDto("u7", CeQueueDto.Status.IN_PROGRESS, 2, "worker2");
+    CeQueueDto u8 = insertCeQueueDto("u8", CeQueueDto.Status.IN_PROGRESS, 3, "worker3");
+
+    underTest.resetTasksWithUnknownWorkerUUIDs(ImmutableSet.of("worker2", "worker3"));
+
+    // Pending tasks must not be modified even if a workerUUID is not present
+    verifyUnmodified(u1);
+    verifyUnmodified(u2);
+    verifyUnmodified(u3);
+    verifyUnmodified(u4);
+
+    // Unknown worker : null, "worker1"
+    verifyReset(u5);
+    verifyReset(u6);
+
+    // Known workers : "worker2", "worker3"
+    verifyUnmodified(u7);
+    verifyUnmodified(u8);
+  }
+
+  @Test
+  public void resetTasksWithUnknownWorkerUUIDs_with_empty_set_will_reset_all_in_progress_tasks() {
+    CeQueueDto u1 = insertCeQueueDto("u1", CeQueueDto.Status.PENDING, 0, null);
+    CeQueueDto u2 = insertCeQueueDto("u2", CeQueueDto.Status.PENDING, 1, "worker1");
+    CeQueueDto u3 = insertCeQueueDto("u3", CeQueueDto.Status.PENDING, 2, null);
+    CeQueueDto u4 = insertCeQueueDto("u4", CeQueueDto.Status.PENDING, 3, "worker2");
+    CeQueueDto u5 = insertCeQueueDto("u5", CeQueueDto.Status.IN_PROGRESS, 0, null);
+    CeQueueDto u6 = insertCeQueueDto("u6", CeQueueDto.Status.IN_PROGRESS, 1, "worker1");
+    CeQueueDto u7 = insertCeQueueDto("u7", CeQueueDto.Status.IN_PROGRESS, 2, "worker2");
+    CeQueueDto u8 = insertCeQueueDto("u8", CeQueueDto.Status.IN_PROGRESS, 3, "worker3");
+
+    underTest.resetTasksWithUnknownWorkerUUIDs(ImmutableSet.of());
+
+    // Pending tasks must not be modified even if a workerUUID is not present
+    verifyUnmodified(u1);
+    verifyUnmodified(u2);
+    verifyUnmodified(u3);
+    verifyUnmodified(u4);
+
+    // Unknown worker : null, "worker1"
+    verifyReset(u5);
+    verifyReset(u6);
+    verifyReset(u7);
+    verifyReset(u8);
+  }
+
+  @Test
+  public void resetTasksWithUnknownWorkerUUIDs_with_worker_without_tasks_will_reset_all_in_progress_tasks() {
+    CeQueueDto u1 = insertCeQueueDto("u1", CeQueueDto.Status.PENDING, 0, null);
+    CeQueueDto u2 = insertCeQueueDto("u2", CeQueueDto.Status.PENDING, 1, "worker1");
+    CeQueueDto u3 = insertCeQueueDto("u3", CeQueueDto.Status.PENDING, 2, null);
+    CeQueueDto u4 = insertCeQueueDto("u4", CeQueueDto.Status.PENDING, 3, "worker2");
+    CeQueueDto u5 = insertCeQueueDto("u5", CeQueueDto.Status.IN_PROGRESS, 0, null);
+    CeQueueDto u6 = insertCeQueueDto("u6", CeQueueDto.Status.IN_PROGRESS, 1, "worker1");
+    CeQueueDto u7 = insertCeQueueDto("u7", CeQueueDto.Status.IN_PROGRESS, 2, "worker2");
+    CeQueueDto u8 = insertCeQueueDto("u8", CeQueueDto.Status.IN_PROGRESS, 3, "worker3");
+
+    underTest.resetTasksWithUnknownWorkerUUIDs(ImmutableSet.of("worker1000", "worker1001"));
+
+    // Pending tasks must not be modified even if a workerUUID is not present
+    verifyUnmodified(u1);
+    verifyUnmodified(u2);
+    verifyUnmodified(u3);
+    verifyUnmodified(u4);
+
+    // Unknown worker : null, "worker1"
+    verifyReset(u5);
+    verifyReset(u6);
+    verifyReset(u7);
+    verifyReset(u8);
+  }
+
+  private void verifyReset(CeQueueDto original) {
+    CeQueueDto dto = dbTester.getDbClient().ceQueueDao().selectByUuid(dbTester.getSession(), original.getUuid()).get();
+    // We do not touch ExecutionCount nor CreatedAt
+    assertThat(dto.getExecutionCount()).isEqualTo(original.getExecutionCount());
+    assertThat(dto.getCreatedAt()).isEqualTo(original.getCreatedAt());
+
+    // Status must have changed to PENDING and must not be equal to previous status
+    assertThat(dto.getStatus()).isEqualTo(CeQueueDto.Status.PENDING).isNotEqualTo(original.getStatus());
+    // UpdatedAt must have been updated
+    assertThat(dto.getUpdatedAt()).isNotEqualTo(original.getUpdatedAt());
+    // StartedAt must be null
+    assertThat(dto.getStartedAt()).isNull();
+    // WorkerUuid must be null
+    assertThat(dto.getWorkerUuid()).isNull();
+  }
+
+  private void verifyUnmodified(CeQueueDto original) {
     CeQueueDto dto = dbTester.getDbClient().ceQueueDao().selectByUuid(dbTester.getSession(), original.getUuid()).get();
     assertThat(dto.getStatus()).isEqualTo(original.getStatus());
     assertThat(dto.getExecutionCount()).isEqualTo(original.getExecutionCount());
index c4a747cd0785d33d80227f4c06ead5eaf6c95946..54f69953a44ed52280588838dc55eb51dde4457d 100644 (file)
@@ -21,6 +21,8 @@ package org.sonar.db.ce;
 
 import java.util.List;
 import java.util.Optional;
+import java.util.Set;
+import java.util.function.Consumer;
 import javax.annotation.Nullable;
 import org.apache.ibatis.session.RowBounds;
 import org.sonar.api.utils.System2;
@@ -29,6 +31,7 @@ import org.sonar.db.DbSession;
 import org.sonar.db.Pagination;
 
 import static java.util.Collections.emptyList;
+import static org.sonar.db.DatabaseUtils.executeLargeUpdates;
 import static org.sonar.db.ce.CeQueueDto.Status.IN_PROGRESS;
 import static org.sonar.db.ce.CeQueueDto.Status.PENDING;
 
@@ -84,6 +87,17 @@ public class CeQueueDao implements Dao {
     return mapper(dbSession).selectPendingByMinimumExecutionCount(minExecutionCount);
   }
 
+  public void resetTasksWithUnknownWorkerUUIDs(DbSession dbSession, Set<String> knownWorkerUUIDs) {
+    if (knownWorkerUUIDs.isEmpty()) {
+      mapper(dbSession).resetAllInProgressTasks(system2.now());
+    } else {
+      // executeLargeUpdates won't call the SQL command if knownWorkerUUIDs is empty
+      executeLargeUpdates(knownWorkerUUIDs,
+        (Consumer<List<String>>) uuids -> mapper(dbSession).resetTasksWithUnknownWorkerUUIDs(uuids, system2.now())
+      );
+    }
+  }
+
   public CeQueueDto insert(DbSession session, CeQueueDto dto) {
     if (dto.getCreatedAt() == 0L || dto.getUpdatedAt() == 0L) {
       long now = system2.now();
index e08b952c1b708221f8e08451b362332b356055f1..42c64f5004018ba42e3fa851e3637a1c56064b68 100644 (file)
@@ -46,6 +46,17 @@ public interface CeQueueMapper {
    */
   List<CeQueueDto> selectPendingByMinimumExecutionCount(@Param("minExecutionCount") int minExecutionCount);
 
+  /**
+   * Select all tasks whose worker UUID is not present in {@code knownWorkerUUIDs}
+   */
+  void resetTasksWithUnknownWorkerUUIDs(@Param("knownWorkerUUIDs") List<String> knownWorkerUUIDs, @Param("updatedAt") long updatedAt);
+
+  /**
+   * Reset all IN_PROGRESS TASKS
+   */
+  void resetAllInProgressTasks(@Param("updatedAt") long updatedAt);
+
+
   int countByStatusAndComponentUuid(@Param("status") CeQueueDto.Status status, @Nullable @Param("componentUuid") String componentUuid);
 
   void insert(CeQueueDto dto);
index 90f60a858063634305083869ac627f0a45b6727d..d8d38a0cb0126daa7ed71c6f15bc4ac88cc9d9c2 100644 (file)
       uuid=#{uuid,jdbcType=VARCHAR}
   </delete>
 
+  <update id="resetTasksWithUnknownWorkerUUIDs">
+    update ce_queue set
+      status='PENDING',
+      worker_uuid=NULL,
+      started_at=NULL,
+      updated_at=#{updatedAt,jdbcType=BIGINT}
+    where
+      status = 'IN_PROGRESS'
+    and (
+      worker_uuid is NULL
+      or worker_uuid not in
+      <foreach collection="knownWorkerUUIDs" open="(" close=")" item="workerUUID" separator=",">
+        #{workerUUID,jdbcType=VARCHAR}
+      </foreach>
+    )
+  </update>
+
+  <update id="resetAllInProgressTasks">
+    update ce_queue set
+      status='PENDING',
+      worker_uuid=NULL,
+      started_at=NULL,
+      updated_at=#{updatedAt,jdbcType=BIGINT}
+    where
+      status = 'IN_PROGRESS'
+  </update>
 </mapper>
index 15605396f9dd9b56f7866f94428256928003afe3..9da7bd49eec2666b36c166da603973f3b8c9f330 100644 (file)
@@ -21,6 +21,7 @@ package org.sonar.db.ce;
 
 import com.google.common.base.Function;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -280,6 +281,65 @@ public class CeQueueDaoTest {
     verifyUnchangedByResetToPendingForWorker(o4);
   }
 
+
+  @Test
+  public void resetTasksWithUnknownWorkerUUIDs_with_empty_set_resets_status_of_all_pending_tasks() {
+    long startedAt = 2_099_888L;
+    CeQueueDto u1 = insert("u1", CeQueueDto.Status.IN_PROGRESS, 1, WORKER_UUID_1, startedAt);
+    CeQueueDto u2 = insert("u2", CeQueueDto.Status.PENDING, 1, WORKER_UUID_1, startedAt);
+    CeQueueDto u3 = insert("u3", CeQueueDto.Status.PENDING, 0, WORKER_UUID_1, startedAt);
+    CeQueueDto u4 = insert("u4", CeQueueDto.Status.IN_PROGRESS, 2, WORKER_UUID_1, startedAt);
+    CeQueueDto o1 = insert("o1", CeQueueDto.Status.IN_PROGRESS, 1, WORKER_UUID_2, startedAt);
+    CeQueueDto o2 = insert("o2", CeQueueDto.Status.PENDING, 1, WORKER_UUID_2, startedAt);
+    CeQueueDto o3 = insert("o3", CeQueueDto.Status.PENDING, 0, WORKER_UUID_2, startedAt);
+    CeQueueDto o4 = insert("o4", CeQueueDto.Status.IN_PROGRESS, 2, WORKER_UUID_2, startedAt);
+
+    underTestAlwaysIncreasingSystem2.resetTasksWithUnknownWorkerUUIDs(db.getSession(), ImmutableSet.of());
+
+    verifyResetByResetTasks(u1);
+    verifyUnchangedByResetToPendingForWorker(u2);
+    verifyUnchangedByResetToPendingForWorker(u3);
+    verifyResetByResetTasks(u4);
+    verifyResetByResetTasks(o1);
+    verifyUnchangedByResetToPendingForWorker(o2);
+    verifyUnchangedByResetToPendingForWorker(o3);
+    verifyResetByResetTasks(o4);
+  }
+
+  @Test
+  public void resetTasksWithUnknownWorkerUUIDs_set_resets_status_of_all_pending_tasks_with_unknown_workers() {
+    long startedAt = 2_099_888L;
+    CeQueueDto u1 = insert("u1", CeQueueDto.Status.IN_PROGRESS, 1, WORKER_UUID_1, startedAt);
+    CeQueueDto u2 = insert("u2", CeQueueDto.Status.PENDING, 1, WORKER_UUID_1, startedAt);
+    CeQueueDto u3 = insert("u3", CeQueueDto.Status.PENDING, 0, WORKER_UUID_1, startedAt);
+    CeQueueDto u4 = insert("u4", CeQueueDto.Status.IN_PROGRESS, 2, WORKER_UUID_1, startedAt);
+    CeQueueDto o1 = insert("o1", CeQueueDto.Status.IN_PROGRESS, 1, WORKER_UUID_2, startedAt);
+    CeQueueDto o2 = insert("o2", CeQueueDto.Status.PENDING, 1, WORKER_UUID_2, startedAt);
+    CeQueueDto o3 = insert("o3", CeQueueDto.Status.PENDING, 0, WORKER_UUID_2, startedAt);
+    CeQueueDto o4 = insert("o4", CeQueueDto.Status.IN_PROGRESS, 2, WORKER_UUID_2, startedAt);
+
+    underTestAlwaysIncreasingSystem2.resetTasksWithUnknownWorkerUUIDs(db.getSession(), ImmutableSet.of(WORKER_UUID_1, "unknown"));
+
+    verifyUnchangedByResetToPendingForWorker(u1);
+    verifyUnchangedByResetToPendingForWorker(u2);
+    verifyUnchangedByResetToPendingForWorker(u3);
+    verifyUnchangedByResetToPendingForWorker(u4);
+    verifyResetByResetTasks(o1);
+    verifyUnchangedByResetToPendingForWorker(o2);
+    verifyUnchangedByResetToPendingForWorker(o3);
+    verifyResetByResetTasks(o4);
+  }
+
+  private void verifyResetByResetTasks(CeQueueDto original) {
+    CeQueueDto dto = db.getDbClient().ceQueueDao().selectByUuid(db.getSession(), original.getUuid()).get();
+    assertThat(dto.getStatus()).isEqualTo(CeQueueDto.Status.PENDING).isNotEqualTo(original.getStatus());
+    assertThat(dto.getExecutionCount()).isEqualTo(original.getExecutionCount());
+    assertThat(dto.getStartedAt()).isNull();
+    assertThat(dto.getCreatedAt()).isEqualTo(original.getCreatedAt());
+    assertThat(dto.getUpdatedAt()).isGreaterThan(original.getUpdatedAt());
+    assertThat(dto.getWorkerUuid()).isNull();
+  }
+
   private void verifyResetToPendingForWorker(CeQueueDto original) {
     CeQueueDto dto = db.getDbClient().ceQueueDao().selectByUuid(db.getSession(), original.getUuid()).get();
     assertThat(dto.getStatus()).isEqualTo(CeQueueDto.Status.PENDING);