]> source.dussan.org Git - sonarqube.git/commitdiff
SONAR-8987 worker reset any in progress task it has when peeking
authorSébastien Lesaint <sebastien.lesaint@sonarsource.com>
Wed, 29 Mar 2017 13:16:28 +0000 (15:16 +0200)
committerEric Hartmann <hartmann.eric@gmail.Com>
Thu, 27 Apr 2017 07:23:18 +0000 (09:23 +0200)
server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java
server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerCallableImpl.java
server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java
server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerCallableImplTest.java
server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueDao.java
server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueMapper.java
server/sonar-db-dao/src/main/resources/org/sonar/db/ce/CeQueueMapper.xml
server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeQueueDaoTest.java

index 8f899d4b2e6a3bdf7982b6ee128be9157c7b9715..8e5bc19bf8fba80bdcf07d191d19ba97c20698b5 100644 (file)
@@ -31,11 +31,13 @@ import javax.annotation.Nullable;
 import org.apache.log4j.Logger;
 import org.sonar.api.ce.ComputeEngineSide;
 import org.sonar.api.utils.System2;
+import org.sonar.api.utils.log.Loggers;
 import org.sonar.ce.monitoring.CEQueueStatus;
 import org.sonar.core.util.UuidFactory;
 import org.sonar.db.DbClient;
 import org.sonar.db.DbSession;
 import org.sonar.db.ce.CeActivityDto;
+import org.sonar.db.ce.CeQueueDao;
 import org.sonar.db.ce.CeQueueDto;
 import org.sonar.server.organization.DefaultOrganizationProvider;
 
@@ -45,6 +47,7 @@ import static java.util.Objects.requireNonNull;
 
 @ComputeEngineSide
 public class InternalCeQueueImpl extends CeQueueImpl implements InternalCeQueue {
+  private static final org.sonar.api.utils.log.Logger LOG = Loggers.get(InternalCeQueueImpl.class);
 
   private static final int MAX_EXECUTION_COUNT = 2;
 
@@ -71,7 +74,12 @@ public class InternalCeQueueImpl extends CeQueueImpl implements InternalCeQueue
       return Optional.empty();
     }
     try (DbSession dbSession = dbClient.openSession(false)) {
-      Optional<CeQueueDto> dto = dbClient.ceQueueDao().peek(dbSession, workerUuid, MAX_EXECUTION_COUNT);
+      CeQueueDao ceQueueDao = dbClient.ceQueueDao();
+      int i = ceQueueDao.resetToPendingForWorker(dbSession, workerUuid);
+      if (i > 0) {
+        LOG.debug("{} in progress tasks reset for worker uuid {}", i, workerUuid);
+      }
+      Optional<CeQueueDto> dto = ceQueueDao.peek(dbSession, workerUuid, MAX_EXECUTION_COUNT);
       CeTask task = null;
       if (dto.isPresent()) {
         task = loadTask(dbSession, dto.get());
index 517a0a7651dac283fb820d5f7dbace52ca7bf7ec..eb6d9d2ee244e0978e389f239ec33678a736855e 100644 (file)
@@ -20,6 +20,7 @@
 package org.sonar.ce.taskprocessor;
 
 import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
 import javax.annotation.Nullable;
 import org.sonar.api.utils.log.Logger;
 import org.sonar.api.utils.log.Loggers;
@@ -61,9 +62,10 @@ public class CeWorkerCallableImpl implements CeWorkerCallable {
     return true;
   }
 
+  private static final AtomicLong counter = new AtomicLong(0);
   private Optional<CeTask> tryAndFindTaskToExecute() {
     try {
-      return queue.peek("UNKNOWN" /*FIXME provide a real worker uuid*/);
+      return queue.peek("uuid" + counter.addAndGet(100));
     } catch (Exception e) {
       LOG.error("Failed to pop the queue of analysis reports", e);
     }
index 071bd177845852977c403975167222a5d1f2249f..93340a397583e7f50505d51cbd9df152ba02fcb0 100644 (file)
@@ -31,7 +31,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 import org.sonar.api.utils.System2;
-import org.sonar.api.utils.internal.TestSystem2;
+import org.sonar.api.utils.internal.AlwaysIncreasingSystem2;
 import org.sonar.ce.monitoring.CEQueueStatus;
 import org.sonar.ce.monitoring.CEQueueStatusImpl;
 import org.sonar.core.util.UuidFactory;
@@ -47,6 +47,7 @@ import org.sonar.db.organization.OrganizationDto;
 import org.sonar.server.organization.DefaultOrganization;
 import org.sonar.server.organization.DefaultOrganizationProvider;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static java.util.Arrays.asList;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.Mockito.mock;
@@ -58,7 +59,7 @@ public class InternalCeQueueImplTest {
   private static final String WORKER_UUID_1 = "worker uuid 1";
   private static final String WORKER_UUID_2 = "worker uuid 2";
 
-  private System2 system2 = new TestSystem2().setNow(1_450_000_000_000L);
+  private System2 system2 = new AlwaysIncreasingSystem2();
 
   @Rule
   public ExpectedException expectedException = ExpectedException.none();
@@ -351,6 +352,110 @@ public class InternalCeQueueImplTest {
     assertThat(underTest.peek(WORKER_UUID_1).isPresent()).isFalse();
   }
 
+  @Test
+  public void peek_resets_to_pending_any_task_in_progress_for_specified_worker_uuid_and_updates_updatedAt_no_matter_execution_count() {
+    insertPending("u0", "doesn't matter", 0); // add a pending one that will be picked so that u1 isn't peek and status reset is visible in DB
+    CeQueueDto u1 = insertPending("u1", WORKER_UUID_1, 2);// won't be peeked because it's worn-out
+    CeQueueDto u2 = insertInProgress("u2", WORKER_UUID_1, 3);// will be reset but won't be picked because it's worn-out
+    CeQueueDto u3 = insertPending("u3", WORKER_UUID_1, 1);// will be picked-because older than any of the reset ones
+    CeQueueDto u4 = insertInProgress("u4", WORKER_UUID_1, 1);// will be reset
+
+    assertThat(underTest.peek(WORKER_UUID_1).get().getUuid()).isEqualTo("u0");
+
+    verifyUnmodifiedTask(u1);
+    verifyResetTask(u2);
+    verifyUnmodifiedTask(u3);
+    verifyResetTask(u4);
+  }
+
+  @Test
+  public void peek_resets_to_pending_any_task_in_progress_for_specified_worker_uuid_and_only_this_uuid() {
+    insertPending("u0", "doesn't matter", 0); // add a pending one that will be picked so that u1 isn't peek and status reset is visible in DB
+    CeQueueDto u1 = insertInProgress("u1", WORKER_UUID_1, 3);
+    CeQueueDto u2 = insertInProgress("u2", WORKER_UUID_2, 3);
+    CeQueueDto u3 = insertInProgress("u3", WORKER_UUID_1, 3);
+    CeQueueDto u4 = insertInProgress("u4", WORKER_UUID_2, 1);
+
+    assertThat(underTest.peek(WORKER_UUID_1).get().getUuid()).isEqualTo("u0");
+
+    verifyResetTask(u1);
+    verifyUnmodifiedTask(u2);
+    verifyResetTask(u3);
+    verifyUnmodifiedTask(u4);
+  }
+
+  @Test
+  public void peek_resets_to_pending_any_task_in_progress_for_specified_worker_uuid_and_peeks_the_oldest_non_worn_out_no_matter_if_it_has_been_reset_or_not() {
+    insertPending("u1", WORKER_UUID_1, 3); // won't be picked because worn out
+    insertInProgress("u2", WORKER_UUID_1, 3); // will be reset but won't be picked because worn out
+    insertPending("u3", WORKER_UUID_1, 0); // will be picked first
+    insertInProgress("u4", WORKER_UUID_1, 1); // will be reset and picked on second call only
+
+    Optional<CeTask> ceTask = underTest.peek(WORKER_UUID_1);
+    assertThat(ceTask.get().getUuid()).isEqualTo("u3");
+
+    // remove first task and do another peek: will pick the reset task since it's now the oldest one
+    underTest.remove(ceTask.get(), CeActivityDto.Status.SUCCESS, null, null);
+    assertThat(underTest.peek(WORKER_UUID_1).get().getUuid()).isEqualTo("u4");
+  }
+
+  @Test
+  public void peek_resets_to_pending_any_task_in_progress_for_specified_worker_uuid_and_peeks_reset_tasks_if_is_the_oldest_non_worn_out() {
+    insertPending("u1", WORKER_UUID_1, 3); // won't be picked because worn out
+    insertInProgress("u2", WORKER_UUID_1, 3); // will be reset but won't be picked because worn out
+    insertInProgress("u3", WORKER_UUID_1, 1); // will be reset and picked
+    insertPending("u4", WORKER_UUID_1, 0); // will be picked second
+
+    Optional<CeTask> ceTask = underTest.peek(WORKER_UUID_1);
+    assertThat(ceTask.get().getUuid()).isEqualTo("u3");
+
+    // remove first task and do another peek: will pick the reset task since it's now the oldest one
+    underTest.remove(ceTask.get(), CeActivityDto.Status.SUCCESS, null, null);
+    assertThat(underTest.peek(WORKER_UUID_1).get().getUuid()).isEqualTo("u4");
+  }
+
+  private void verifyResetTask(CeQueueDto originalDto) {
+    CeQueueDto dto = dbTester.getDbClient().ceQueueDao().selectByUuid(session, originalDto.getUuid()).get();
+    assertThat(dto.getStatus()).isEqualTo(CeQueueDto.Status.PENDING);
+    assertThat(dto.getExecutionCount()).isEqualTo(originalDto.getExecutionCount());
+    assertThat(dto.getCreatedAt()).isEqualTo(originalDto.getCreatedAt());
+    assertThat(dto.getUpdatedAt()).isGreaterThan(originalDto.getUpdatedAt());
+  }
+
+  private void verifyUnmodifiedTask(CeQueueDto originalDto) {
+    CeQueueDto dto = dbTester.getDbClient().ceQueueDao().selectByUuid(session, originalDto.getUuid()).get();
+    assertThat(dto.getStatus()).isEqualTo(originalDto.getStatus());
+    assertThat(dto.getExecutionCount()).isEqualTo(originalDto.getExecutionCount());
+    assertThat(dto.getCreatedAt()).isEqualTo(originalDto.getCreatedAt());
+    assertThat(dto.getUpdatedAt()).isEqualTo(originalDto.getUpdatedAt());
+  }
+
+  private CeQueueDto insertInProgress(String uuid, String workerUuid, int executionCount) {
+    checkArgument(executionCount > 0, "execution count less than 1 does not make sense for an in progress task");
+    CeQueueDto dto = new CeQueueDto()
+      .setUuid(uuid)
+      .setTaskType("foo")
+      .setStatus(CeQueueDto.Status.IN_PROGRESS)
+      .setWorkerUuid(workerUuid)
+      .setExecutionCount(executionCount);
+    dbTester.getDbClient().ceQueueDao().insert(session, dto);
+    dbTester.commit();
+    return dto;
+  }
+
+  private CeQueueDto insertPending(String uuid, String workerUuid, int executionCount) {
+    checkArgument(executionCount > -1, "execution count less than 0 does not make sense for a pending task");
+    CeQueueDto dto = new CeQueueDto()
+      .setUuid(uuid)
+      .setTaskType("foo")
+      .setStatus(CeQueueDto.Status.PENDING)
+      .setWorkerUuid(workerUuid)
+      .setExecutionCount(executionCount);
+    dbTester.getDbClient().ceQueueDao().insert(session, dto);
+    dbTester.commit();
+    return dto;
+  }
+
   @Test
   public void cancel_pending() throws Exception {
     CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1");
@@ -503,10 +608,11 @@ public class InternalCeQueueImplTest {
   private void verifyCeQueueDtoForTaskSubmit(CeTaskSubmit taskSubmit) {
     Optional<CeQueueDto> queueDto = dbTester.getDbClient().ceQueueDao().selectByUuid(dbTester.getSession(), taskSubmit.getUuid());
     assertThat(queueDto.isPresent()).isTrue();
-    assertThat(queueDto.get().getTaskType()).isEqualTo(taskSubmit.getType());
-    assertThat(queueDto.get().getComponentUuid()).isEqualTo(taskSubmit.getComponentUuid());
-    assertThat(queueDto.get().getSubmitterLogin()).isEqualTo(taskSubmit.getSubmitterLogin());
-    assertThat(queueDto.get().getCreatedAt()).isEqualTo(1_450_000_000_000L);
+    CeQueueDto dto = queueDto.get();
+    assertThat(dto.getTaskType()).isEqualTo(taskSubmit.getType());
+    assertThat(dto.getComponentUuid()).isEqualTo(taskSubmit.getComponentUuid());
+    assertThat(dto.getSubmitterLogin()).isEqualTo(taskSubmit.getSubmitterLogin());
+    assertThat(dto.getCreatedAt()).isEqualTo(dto.getUpdatedAt()).isNotNull();
   }
 
   private ComponentDto newComponentDto(String uuid) {
index 05c6112ff57bf5ca50d893946921b5861420e1eb..4fc5aee88b435a8a5552a617e4ed39211a261492 100644 (file)
@@ -24,6 +24,7 @@ import java.util.Optional;
 import javax.annotation.Nullable;
 import org.junit.Rule;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
 import org.mockito.InOrder;
 import org.mockito.Mockito;
 import org.sonar.api.utils.log.LogTester;
@@ -36,16 +37,16 @@ import org.sonar.db.ce.CeTaskTypes;
 import org.sonar.server.computation.task.projectanalysis.taskprocessor.ReportTaskProcessor;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyZeroInteractions;
 import static org.mockito.Mockito.when;
 
 public class CeWorkerCallableImplTest {
 
-  private static final String UNKNOWN_WORKER_UUID = "UNKNOWN";
-
   @Rule
   public CeTaskProcessorRepositoryRule taskProcessorRepository = new CeTaskProcessorRepositoryRule();
   @Rule
@@ -54,12 +55,13 @@ public class CeWorkerCallableImplTest {
   private InternalCeQueue queue = mock(InternalCeQueue.class);
   private ReportTaskProcessor taskProcessor = mock(ReportTaskProcessor.class);
   private CeLogging ceLogging = spy(CeLogging.class);
+  private ArgumentCaptor<String> workerUuid = ArgumentCaptor.forClass(String.class);
   private CeWorkerCallable underTest = new CeWorkerCallableImpl(queue, ceLogging, taskProcessorRepository);
   private InOrder inOrder = Mockito.inOrder(ceLogging, taskProcessor, queue);
 
   @Test
   public void no_pending_tasks_in_queue() throws Exception {
-    when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.empty());
+    when(queue.peek(anyString())).thenReturn(Optional.empty());
 
     assertThat(underTest.call()).isFalse();
 
@@ -70,10 +72,11 @@ public class CeWorkerCallableImplTest {
   public void fail_when_no_CeTaskProcessor_is_found_in_repository() throws Exception {
     CeTask task = createCeTask(null);
     taskProcessorRepository.setNoProcessorForTask(CeTaskTypes.REPORT);
-    when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(task));
+    when(queue.peek(anyString())).thenReturn(Optional.of(task));
 
     assertThat(underTest.call()).isTrue();
 
+    verifyWorkerUuid();
     inOrder.verify(ceLogging).initForTask(task);
     inOrder.verify(queue).remove(task, CeActivityDto.Status.FAILED, null, null);
     inOrder.verify(ceLogging).clearForTask();
@@ -83,10 +86,11 @@ public class CeWorkerCallableImplTest {
   public void peek_and_process_task() throws Exception {
     CeTask task = createCeTask(null);
     taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor);
-    when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(task));
+    when(queue.peek(anyString())).thenReturn(Optional.of(task));
 
     assertThat(underTest.call()).isTrue();
 
+    verifyWorkerUuid();
     inOrder.verify(ceLogging).initForTask(task);
     inOrder.verify(taskProcessor).process(task);
     inOrder.verify(queue).remove(task, CeActivityDto.Status.SUCCESS, null, null);
@@ -96,12 +100,13 @@ public class CeWorkerCallableImplTest {
   @Test
   public void fail_to_process_task() throws Exception {
     CeTask task = createCeTask(null);
-    when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(task));
+    when(queue.peek(anyString())).thenReturn(Optional.of(task));
     taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor);
     Throwable error = makeTaskProcessorFail(task);
 
     assertThat(underTest.call()).isTrue();
 
+    verifyWorkerUuid();
     inOrder.verify(ceLogging).initForTask(task);
     inOrder.verify(taskProcessor).process(task);
     inOrder.verify(queue).remove(task, CeActivityDto.Status.FAILED, null, error);
@@ -110,11 +115,12 @@ public class CeWorkerCallableImplTest {
 
   @Test
   public void do_not_display_submitter_param_in_log_when_submitterLogin_is_not_set_in_case_of_success() throws Exception {
-    when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(createCeTask(null)));
+    when(queue.peek(anyString())).thenReturn(Optional.of(createCeTask(null)));
     taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
 
     underTest.call();
 
+    verifyWorkerUuid();
     List<String> logs = logTester.logs(LoggerLevel.INFO);
     assertThat(logs).hasSize(2);
     for (int i = 0; i < 2; i++) {
@@ -125,12 +131,13 @@ public class CeWorkerCallableImplTest {
   @Test
   public void do_not_display_submitter_param_in_log_when_submitterLogin_is_not_set_in_case_of_error() throws Exception {
     CeTask ceTask = createCeTask(null);
-    when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(ceTask));
+    when(queue.peek(anyString())).thenReturn(Optional.of(ceTask));
     taskProcessorRepository.setProcessorForTask(ceTask.getType(), taskProcessor);
     makeTaskProcessorFail(ceTask);
 
     underTest.call();
 
+    verifyWorkerUuid();
     List<String> logs = logTester.logs(LoggerLevel.INFO);
     assertThat(logs).hasSize(1);
     assertThat(logs.get(0)).doesNotContain(" | submitter=");
@@ -144,11 +151,12 @@ public class CeWorkerCallableImplTest {
 
   @Test
   public void display_submitterLogin_in_logs_when_set_in_case_of_success() throws Exception {
-    when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(createCeTask("FooBar")));
+    when(queue.peek(anyString())).thenReturn(Optional.of(createCeTask("FooBar")));
     taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
 
     underTest.call();
 
+    verifyWorkerUuid();
     List<String> logs = logTester.logs(LoggerLevel.INFO);
     assertThat(logs).hasSize(2);
     assertThat(logs.get(0)).contains(" | submitter=FooBar");
@@ -160,12 +168,13 @@ public class CeWorkerCallableImplTest {
   @Test
   public void display_submitterLogin_in_logs_when_set_in_case_of_error() throws Exception {
     CeTask ceTask = createCeTask("FooBar");
-    when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(ceTask));
+    when(queue.peek(anyString())).thenReturn(Optional.of(ceTask));
     taskProcessorRepository.setProcessorForTask(ceTask.getType(), taskProcessor);
     makeTaskProcessorFail(ceTask);
 
     underTest.call();
 
+    verifyWorkerUuid();
     List<String> logs = logTester.logs(LoggerLevel.INFO);
     assertThat(logs).hasSize(1);
     assertThat(logs.iterator().next()).contains(" | submitter=FooBar");
@@ -179,11 +188,12 @@ public class CeWorkerCallableImplTest {
   public void display_start_stop_at_debug_level_for_console_if_DEBUG_is_enabled_and_task_successful() throws Exception {
     logTester.setLevel(LoggerLevel.DEBUG);
 
-    when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(createCeTask("FooBar")));
+    when(queue.peek(anyString())).thenReturn(Optional.of(createCeTask("FooBar")));
     taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
 
     underTest.call();
 
+    verifyWorkerUuid();
     List<String> logs = logTester.logs(LoggerLevel.INFO);
     assertThat(logs).hasSize(2);
     assertThat(logs.get(0)).contains(" | submitter=FooBar");
@@ -197,12 +207,13 @@ public class CeWorkerCallableImplTest {
     logTester.setLevel(LoggerLevel.DEBUG);
 
     CeTask ceTask = createCeTask("FooBar");
-    when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(ceTask));
+    when(queue.peek(anyString())).thenReturn(Optional.of(ceTask));
     taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
     makeTaskProcessorFail(ceTask);
 
     underTest.call();
 
+    verifyWorkerUuid();
     List<String> logs = logTester.logs(LoggerLevel.INFO);
     assertThat(logs).hasSize(1);
     assertThat(logs.iterator().next()).contains(" | submitter=FooBar");
@@ -213,6 +224,11 @@ public class CeWorkerCallableImplTest {
     assertThat(logTester.logs(LoggerLevel.DEBUG)).isEmpty();
   }
 
+  private void verifyWorkerUuid() {
+    verify(queue).peek(workerUuid.capture());
+    assertThat(workerUuid.getValue()).startsWith("uuid");
+  }
+
   private static CeTask createCeTask(@Nullable String submitterLogin) {
     return new CeTask.Builder()
       .setOrganizationUuid("org1")
index 589f2e200c1f87abe8dd45d1171f93ab001801b6..c4a747cd0785d33d80227f4c06ead5eaf6c95946 100644 (file)
@@ -106,6 +106,14 @@ public class CeQueueDao implements Dao {
     mapper(session).resetAllToPendingStatus(system2.now());
   }
 
+  /**
+   * Update all tasks for the specified worker uuid which are not PENDING to:
+   * STATUS='PENDING', STARTED_AT=NULL, UPDATED_AT={now}.
+   */
+  public int resetToPendingForWorker(DbSession session, String workerUuid) {
+    return mapper(session).resetToPendingForWorker(workerUuid, system2.now());
+  }
+
   public int countByStatus(DbSession dbSession, CeQueueDto.Status status) {
     return mapper(dbSession).countByStatusAndComponentUuid(status, null);
   }
index 2d82fb11ea41095ba94aa66f9ee797970de8e66b..e08b952c1b708221f8e08451b362332b356055f1 100644 (file)
@@ -52,6 +52,8 @@ public interface CeQueueMapper {
 
   void resetAllToPendingStatus(@Param("updatedAt") long updatedAt);
 
+  int resetToPendingForWorker(@Param("workerUuid") String workerUuid, @Param("updatedAt") long updatedAt);
+
   int updateIf(@Param("uuid") String uuid,
     @Param("new") UpdateIf.NewProperties newProperties,
     @Param("old") UpdateIf.OldProperties oldProperties);
index df76f8c561efaacf03c8b8cfc6735d3ff0efa693..90f60a858063634305083869ac627f0a45b6727d 100644 (file)
       status &lt;&gt; 'PENDING'
   </update>
 
+  <update id="resetToPendingForWorker">
+    update ce_queue set
+      status='PENDING',
+      started_at=NULL,
+      updated_at=#{updatedAt,jdbcType=BIGINT}
+    where
+      status &lt;&gt; 'PENDING'
+      and worker_uuid = #{workerUuid,jdbcType=VARCHAR}
+  </update>
+
   <update id="updateIf" parameterType="map">
     update ce_queue set
       status=#{new.status,jdbcType=VARCHAR},
index 0bcc63c25218832d75bc912878b43ebd5cd03fff..15605396f9dd9b56f7866f94428256928003afe3 100644 (file)
@@ -256,6 +256,50 @@ public class CeQueueDaoTest {
     assertThat(saved.getExecutionCount()).isEqualTo(EXECUTION_COUNT);
   }
 
+  @Test
+  public void resetToPendingForWorker_resets_status_of_non_pending_tasks_only_for_specified_workerUuid() {
+    long startedAt = 2_099_888L;
+    CeQueueDto u1 = insert("u1", CeQueueDto.Status.IN_PROGRESS, 1, WORKER_UUID_1, startedAt);
+    CeQueueDto u2 = insert("u2", CeQueueDto.Status.PENDING, 1, WORKER_UUID_1, startedAt);
+    CeQueueDto u3 = insert("u3", CeQueueDto.Status.PENDING, 0, WORKER_UUID_1, startedAt);
+    CeQueueDto u4 = insert("u4", CeQueueDto.Status.IN_PROGRESS, 2, WORKER_UUID_1, startedAt);
+    CeQueueDto o1 = insert("o1", CeQueueDto.Status.IN_PROGRESS, 1, WORKER_UUID_2, startedAt);
+    CeQueueDto o2 = insert("o2", CeQueueDto.Status.PENDING, 1, WORKER_UUID_2, startedAt);
+    CeQueueDto o3 = insert("o3", CeQueueDto.Status.PENDING, 0, WORKER_UUID_2, startedAt);
+    CeQueueDto o4 = insert("o4", CeQueueDto.Status.IN_PROGRESS, 2, WORKER_UUID_2, startedAt);
+
+    underTestAlwaysIncreasingSystem2.resetToPendingForWorker(db.getSession(), WORKER_UUID_1);
+
+    verifyResetToPendingForWorker(u1);
+    verifyUnchangedByResetToPendingForWorker(u2);
+    verifyUnchangedByResetToPendingForWorker(u3);
+    verifyResetToPendingForWorker(u4);
+    verifyUnchangedByResetToPendingForWorker(o1);
+    verifyUnchangedByResetToPendingForWorker(o2);
+    verifyUnchangedByResetToPendingForWorker(o3);
+    verifyUnchangedByResetToPendingForWorker(o4);
+  }
+
+  private void verifyResetToPendingForWorker(CeQueueDto original) {
+    CeQueueDto dto = db.getDbClient().ceQueueDao().selectByUuid(db.getSession(), original.getUuid()).get();
+    assertThat(dto.getStatus()).isEqualTo(CeQueueDto.Status.PENDING);
+    assertThat(dto.getExecutionCount()).isEqualTo(original.getExecutionCount());
+    assertThat(dto.getStartedAt()).isNull();
+    assertThat(dto.getCreatedAt()).isEqualTo(original.getCreatedAt());
+    assertThat(dto.getUpdatedAt()).isGreaterThan(original.getUpdatedAt());
+    assertThat(dto.getWorkerUuid()).isEqualTo(original.getWorkerUuid());
+  }
+
+  private void verifyUnchangedByResetToPendingForWorker(CeQueueDto original) {
+    CeQueueDto dto = db.getDbClient().ceQueueDao().selectByUuid(db.getSession(), original.getUuid()).get();
+    assertThat(dto.getStatus()).isEqualTo(original.getStatus());
+    assertThat(dto.getExecutionCount()).isEqualTo(original.getExecutionCount());
+    assertThat(dto.getStartedAt()).isEqualTo(original.getStartedAt());
+    assertThat(dto.getCreatedAt()).isEqualTo(original.getCreatedAt());
+    assertThat(dto.getUpdatedAt()).isEqualTo(original.getUpdatedAt());
+    assertThat(dto.getWorkerUuid()).isEqualTo(original.getWorkerUuid());
+  }
+
   @Test
   public void peek_none_if_no_pendings() throws Exception {
     assertThat(underTest.peek(db.getSession(), WORKER_UUID_1, MAX_EXECUTION_COUNT).isPresent()).isFalse();
@@ -510,6 +554,20 @@ public class CeQueueDaoTest {
     return dto;
   }
 
+  private CeQueueDto insert(String uuid, CeQueueDto.Status status, int executionCount, String workerUuid, Long startedAt) {
+    CeQueueDto dto = new CeQueueDto();
+    dto.setUuid(uuid);
+    dto.setTaskType(CeTaskTypes.REPORT);
+    dto.setStatus(status);
+    dto.setSubmitterLogin("henri");
+    dto.setExecutionCount(executionCount);
+    dto.setWorkerUuid(workerUuid);
+    dto.setStartedAt(startedAt);
+    underTestAlwaysIncreasingSystem2.insert(db.getSession(), dto);
+    db.getSession().commit();
+    return dto;
+  }
+
   private CeQueueDto insert(String uuid, String componentUuid, CeQueueDto.Status status) {
     CeQueueDto dto = new CeQueueDto();
     dto.setUuid(uuid);