]> source.dussan.org Git - sonarqube.git/commitdiff
SONAR-19482 fix the CE worker pending reset mechanism to not generate transaction...
authorPierre <pierre.guillot@sonarsource.com>
Thu, 8 Jun 2023 08:35:08 +0000 (10:35 +0200)
committersonartech <sonartech@sonarsource.com>
Fri, 9 Jun 2023 20:03:09 +0000 (20:03 +0000)
server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java
server/sonar-db-dao/src/it/java/org/sonar/db/ce/CeQueueDaoIT.java
server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueDao.java
server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueMapper.java
server/sonar-db-dao/src/main/resources/org/sonar/db/ce/CeQueueMapper.xml

index 69d319c7aa93b5ea6de5e3b5175b538450b102e0..062216822a9ab66ea958975dd63853b0bb2da059 100644 (file)
@@ -29,6 +29,7 @@ import java.util.Optional;
 import java.util.Set;
 import javax.annotation.CheckForNull;
 import javax.annotation.Nullable;
+
 import org.sonar.api.ce.ComputeEngineSide;
 import org.sonar.api.utils.System2;
 import org.sonar.api.utils.log.Logger;
@@ -43,7 +44,6 @@ import org.sonar.core.util.UuidFactory;
 import org.sonar.db.DbClient;
 import org.sonar.db.DbSession;
 import org.sonar.db.ce.CeActivityDto;
-import org.sonar.db.ce.CeQueueDao;
 import org.sonar.db.ce.CeQueueDto;
 import org.sonar.db.ce.CeTaskCharacteristicDto;
 import org.sonar.db.component.ComponentDto;
@@ -82,12 +82,7 @@ public class InternalCeQueueImpl extends CeQueueImpl implements InternalCeQueue
       return Optional.empty();
     }
     try (DbSession dbSession = dbClient.openSession(false)) {
-      CeQueueDao ceQueueDao = dbClient.ceQueueDao();
-      int i = ceQueueDao.resetToPendingForWorker(dbSession, workerUuid);
-      if (i > 0) {
-        dbSession.commit();
-        LOG.debug("{} in progress tasks reset for worker uuid {}", i, workerUuid);
-      }
+      resetNotPendingTasks(workerUuid, dbSession);
       Optional<CeQueueDto> opt = nextPendingTaskPicker.findPendingTask(workerUuid, dbSession, excludeIndexationJob);
       if (opt.isEmpty()) {
         return Optional.empty();
@@ -105,6 +100,17 @@ public class InternalCeQueueImpl extends CeQueueImpl implements InternalCeQueue
     }
   }
 
+  private void resetNotPendingTasks(String workerUuid, DbSession dbSession) {
+    List<CeQueueDto> notPendingTasks = dbClient.ceQueueDao().selectNotPendingForWorker(dbSession, workerUuid);
+    if (!notPendingTasks.isEmpty()) {
+      for (CeQueueDto pendingTask : notPendingTasks) {
+        dbClient.ceQueueDao().resetToPendingByUuid(dbSession, pendingTask.getUuid());
+      }
+      dbSession.commit();
+      LOG.debug("{} in progress tasks reset for worker uuid {}", notPendingTasks.size(), workerUuid);
+    }
+  }
+
   @Override
   public void remove(CeTask task, CeActivityDto.Status status, @Nullable CeTaskResult taskResult, @Nullable Throwable error) {
     checkArgument(error == null || status == CeActivityDto.Status.FAILED, "Error can be provided only when status is FAILED");
index 92a53d4cb493ccbe7dfbaf72faff1c763fecfc24..0523341c9b2c387257518367aba2d07cf5c5fc55 100644 (file)
@@ -67,17 +67,17 @@ public class CeQueueDaoIT {
   private static final String WORKER_UUID_1 = "worker uuid 1";
   private static final String WORKER_UUID_2 = "worker uuid 2";
 
-  private TestSystem2 system2 = new TestSystem2().setNow(INIT_TIME);
+  private final TestSystem2 system2 = new TestSystem2().setNow(INIT_TIME);
 
   @Rule
   public DbTester db = DbTester.create(system2);
 
-  private System2 mockedSystem2 = mock(System2.class);
-  private System2 alwaysIncreasingSystem2 = new AlwaysIncreasingSystem2();
+  private final System2 mockedSystem2 = mock(System2.class);
+  private final System2 alwaysIncreasingSystem2 = new AlwaysIncreasingSystem2();
 
-  private CeQueueDao underTest = new CeQueueDao(system2);
-  private CeQueueDao underTestWithSystem2Mock = new CeQueueDao(mockedSystem2);
-  private CeQueueDao underTestAlwaysIncreasingSystem2 = new CeQueueDao(alwaysIncreasingSystem2);
+  private final CeQueueDao underTest = new CeQueueDao(system2);
+  private final CeQueueDao underTestWithSystem2Mock = new CeQueueDao(mockedSystem2);
+  private final CeQueueDao underTestAlwaysIncreasingSystem2 = new CeQueueDao(alwaysIncreasingSystem2);
 
   @Test
   public void insert_populates_createdAt_and_updateAt_from_System2_with_same_value_if_any_is_not_set() {
@@ -263,7 +263,7 @@ public class CeQueueDaoIT {
   @Test
   public void test_delete_with_expected_status() {
     insertPending(TASK_UUID_1, MAIN_COMPONENT_UUID_1);
-    insertInProgress(TASK_UUID_2);
+    insertInProgress(TASK_UUID_2, "workerUuid", System2.INSTANCE.now());
 
     int deletedCount = underTest.deleteByUuid(db.getSession(), "UNKNOWN", null);
     assertThat(deletedCount).isZero();
@@ -286,6 +286,31 @@ public class CeQueueDaoIT {
     assertThat(underTest.selectByUuid(db.getSession(), TASK_UUID_2)).isEmpty();
   }
 
+  @Test
+  public void selectNotPendingForWorker_return_non_pending_tasks_for_specified_workerUuid() {
+    long startedAt = alwaysIncreasingSystem2.now();
+    insertPending("u1");
+    CeQueueDto inProgressTaskWorker1 = insertInProgress("u2", WORKER_UUID_1, startedAt);
+    insertInProgress("o2", WORKER_UUID_2, startedAt);
+
+    List<CeQueueDto> notPendingForWorker = underTestAlwaysIncreasingSystem2.selectNotPendingForWorker(db.getSession(), WORKER_UUID_1);
+
+    assertThat(notPendingForWorker).extracting(CeQueueDto::getUuid)
+            .contains(inProgressTaskWorker1.getUuid());
+  }
+
+  @Test
+  public void resetToPendingByUuid_resets_status_of_specific_task() {
+    long task1startedAt = alwaysIncreasingSystem2.now();
+    CeQueueDto task1 = insertInProgress("uuid-1", "workerUuid", task1startedAt);
+    CeQueueDto task2 = insertInProgress("uuid-2", "workerUuid", alwaysIncreasingSystem2.now());
+
+    underTestAlwaysIncreasingSystem2.resetToPendingByUuid(db.getSession(), task1.getUuid());
+
+    verifyResetToPendingForWorker(task1, task1.getWorkerUuid(), task1startedAt);
+    verifyUnchangedByResetToPendingForWorker(task2);
+  }
+
   @Test
   public void resetToPendingForWorker_resets_status_of_non_pending_tasks_only_for_specified_workerUuid() {
     CeQueueDto[] worker1 = {insertPending("u1"), insertPending("u2"), insertPending("u3"), insertPending("u4")};
@@ -296,7 +321,10 @@ public class CeQueueDaoIT {
     makeInProgress(WORKER_UUID_2, startedAt, worker2[0]);
     makeInProgress(WORKER_UUID_2, startedAt, worker2[3]);
 
-    underTestAlwaysIncreasingSystem2.resetToPendingForWorker(db.getSession(), WORKER_UUID_1);
+    List<CeQueueDto> notPendingForWorker = underTestAlwaysIncreasingSystem2.selectNotPendingForWorker(db.getSession(), WORKER_UUID_1);
+    for (CeQueueDto ceQueueDto : notPendingForWorker) {
+      underTestAlwaysIncreasingSystem2.resetToPendingByUuid(db.getSession(), ceQueueDto.getUuid());
+    }
 
     verifyResetToPendingForWorker(worker1[0], WORKER_UUID_1, startedAt);
     verifyUnchangedByResetToPendingForWorker(worker1[1]);
@@ -739,10 +767,10 @@ public class CeQueueDaoIT {
     return dto;
   }
 
-  private CeQueueDto insertInProgress(String uuid) {
-    CeQueueDto ceQueueDto = insertPending(uuid);
-    CeQueueTesting.makeInProgress(db.getSession(), "workerUuid", System2.INSTANCE.now(), ceQueueDto);
-    return underTest.selectByUuid(db.getSession(), uuid).get();
+  private CeQueueDto insertInProgress(String taskUuid, String workerUuid, long now) {
+    CeQueueDto ceQueueDto = insertPending(taskUuid);
+    CeQueueTesting.makeInProgress(db.getSession(), workerUuid, now, ceQueueDto);
+    return underTest.selectByUuid(db.getSession(), taskUuid).get();
   }
 
   private void insertCharacteristic(String key, String value, String uuid, String taskUuid) {
index 6407e45327a823bdc46c8af5bb91a9a91c58f895..77cf830d594901c1f1bdf7dcc01478c57094a894 100644 (file)
@@ -133,12 +133,12 @@ public class CeQueueDao implements Dao {
     return mapper(session).deleteByUuid(uuid, deleteIf);
   }
 
-  /**
-   * Updates all tasks for the specified worker uuid which are not PENDING to:
-   * STATUS='PENDING', STARTED_AT=NULL, UPDATED_AT={now}.
-   */
-  public int resetToPendingForWorker(DbSession session, String workerUuid) {
-    return mapper(session).resetToPendingForWorker(workerUuid, system2.now());
+  public void resetToPendingByUuid(DbSession session, String uuid) {
+    mapper(session).resetToPendingByUuid(uuid, system2.now());
+  }
+
+  public List<CeQueueDto> selectNotPendingForWorker(DbSession session, String uuid) {
+    return mapper(session).selectNotPendingForWorker(uuid);
   }
 
   public int countByStatus(DbSession dbSession, CeQueueDto.Status status) {
index 52871a4645491e985acdc6470dab6c29ef4b9fa7..4b991dfcbdb58c0f2def3510f6494d7266d93480 100644 (file)
@@ -80,7 +80,9 @@ public interface CeQueueMapper {
 
   void insert(CeQueueDto dto);
 
-  int resetToPendingForWorker(@Param("workerUuid") String workerUuid, @Param("updatedAt") long updatedAt);
+  List<CeQueueDto> selectNotPendingForWorker(@Param("workerUuid") String workerUuid);
+
+  void resetToPendingByUuid(@Param("uuid") String uuid, @Param("updatedAt") long updatedAt);
 
   int updateIf(@Param("uuid") String uuid,
     @Param("new") UpdateIf.NewProperties newProperties,
index 0e33cabba689e7615a17fe71fc48c652942da064..550aa6789b46ff2792a4ff38b9e99a77e994143a 100644 (file)
     )
   </insert>
 
-  <update id="resetToPendingForWorker">
+  <select id="selectNotPendingForWorker" resultType="org.sonar.db.ce.CeQueueDto">
+    select
+    <include refid="columns"/>
+    from
+    ce_queue cq
+    where
+    cq.status &lt;&gt; 'PENDING'
+    and worker_uuid = #{workerUuid,jdbcType=VARCHAR}
+  </select>
+
+  <update id="resetToPendingByUuid">
     update ce_queue set
       status='PENDING',
       updated_at=#{updatedAt,jdbcType=BIGINT}
     where
-      status &lt;&gt; 'PENDING'
-      and worker_uuid = #{workerUuid,jdbcType=VARCHAR}
+      uuid = #{uuid,jdbcType=VARCHAR}
   </update>
 
   <update id="updateIf" parameterType="map">