From d9770048ef20fa6c100718129166748523db377f Mon Sep 17 00:00:00 2001 From: Pierre Date: Thu, 8 Jun 2023 10:35:08 +0200 Subject: [PATCH] SONAR-19482 fix the CE worker pending reset mechanism to not generate transaction rollback --- .../sonar/ce/queue/InternalCeQueueImpl.java | 20 ++++--- .../it/java/org/sonar/db/ce/CeQueueDaoIT.java | 52 ++++++++++++++----- .../main/java/org/sonar/db/ce/CeQueueDao.java | 12 ++--- .../java/org/sonar/db/ce/CeQueueMapper.java | 4 +- .../org/sonar/db/ce/CeQueueMapper.xml | 15 ++++-- 5 files changed, 74 insertions(+), 29 deletions(-) diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java index 69d319c7aa9..062216822a9 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java @@ -29,6 +29,7 @@ import java.util.Optional; import java.util.Set; import javax.annotation.CheckForNull; import javax.annotation.Nullable; + import org.sonar.api.ce.ComputeEngineSide; import org.sonar.api.utils.System2; import org.sonar.api.utils.log.Logger; @@ -43,7 +44,6 @@ import org.sonar.core.util.UuidFactory; import org.sonar.db.DbClient; import org.sonar.db.DbSession; import org.sonar.db.ce.CeActivityDto; -import org.sonar.db.ce.CeQueueDao; import org.sonar.db.ce.CeQueueDto; import org.sonar.db.ce.CeTaskCharacteristicDto; import org.sonar.db.component.ComponentDto; @@ -82,12 +82,7 @@ public class InternalCeQueueImpl extends CeQueueImpl implements InternalCeQueue return Optional.empty(); } try (DbSession dbSession = dbClient.openSession(false)) { - CeQueueDao ceQueueDao = dbClient.ceQueueDao(); - int i = ceQueueDao.resetToPendingForWorker(dbSession, workerUuid); - if (i > 0) { - dbSession.commit(); - LOG.debug("{} in progress tasks reset for worker uuid {}", i, workerUuid); - } + resetNotPendingTasks(workerUuid, dbSession); Optional opt = nextPendingTaskPicker.findPendingTask(workerUuid, dbSession, excludeIndexationJob); if (opt.isEmpty()) { return Optional.empty(); @@ -105,6 +100,17 @@ public class InternalCeQueueImpl extends CeQueueImpl implements InternalCeQueue } } + private void resetNotPendingTasks(String workerUuid, DbSession dbSession) { + List notPendingTasks = dbClient.ceQueueDao().selectNotPendingForWorker(dbSession, workerUuid); + if (!notPendingTasks.isEmpty()) { + for (CeQueueDto pendingTask : notPendingTasks) { + dbClient.ceQueueDao().resetToPendingByUuid(dbSession, pendingTask.getUuid()); + } + dbSession.commit(); + LOG.debug("{} in progress tasks reset for worker uuid {}", notPendingTasks.size(), workerUuid); + } + } + @Override public void remove(CeTask task, CeActivityDto.Status status, @Nullable CeTaskResult taskResult, @Nullable Throwable error) { checkArgument(error == null || status == CeActivityDto.Status.FAILED, "Error can be provided only when status is FAILED"); diff --git a/server/sonar-db-dao/src/it/java/org/sonar/db/ce/CeQueueDaoIT.java b/server/sonar-db-dao/src/it/java/org/sonar/db/ce/CeQueueDaoIT.java index 92a53d4cb49..0523341c9b2 100644 --- a/server/sonar-db-dao/src/it/java/org/sonar/db/ce/CeQueueDaoIT.java +++ b/server/sonar-db-dao/src/it/java/org/sonar/db/ce/CeQueueDaoIT.java @@ -67,17 +67,17 @@ public class CeQueueDaoIT { private static final String WORKER_UUID_1 = "worker uuid 1"; private static final String WORKER_UUID_2 = "worker uuid 2"; - private TestSystem2 system2 = new TestSystem2().setNow(INIT_TIME); + private final TestSystem2 system2 = new TestSystem2().setNow(INIT_TIME); @Rule public DbTester db = DbTester.create(system2); - private System2 mockedSystem2 = mock(System2.class); - private System2 alwaysIncreasingSystem2 = new AlwaysIncreasingSystem2(); + private final System2 mockedSystem2 = mock(System2.class); + private final System2 alwaysIncreasingSystem2 = new AlwaysIncreasingSystem2(); - private CeQueueDao underTest = new CeQueueDao(system2); - private CeQueueDao underTestWithSystem2Mock = new CeQueueDao(mockedSystem2); - private CeQueueDao underTestAlwaysIncreasingSystem2 = new CeQueueDao(alwaysIncreasingSystem2); + private final CeQueueDao underTest = new CeQueueDao(system2); + private final CeQueueDao underTestWithSystem2Mock = new CeQueueDao(mockedSystem2); + private final CeQueueDao underTestAlwaysIncreasingSystem2 = new CeQueueDao(alwaysIncreasingSystem2); @Test public void insert_populates_createdAt_and_updateAt_from_System2_with_same_value_if_any_is_not_set() { @@ -263,7 +263,7 @@ public class CeQueueDaoIT { @Test public void test_delete_with_expected_status() { insertPending(TASK_UUID_1, MAIN_COMPONENT_UUID_1); - insertInProgress(TASK_UUID_2); + insertInProgress(TASK_UUID_2, "workerUuid", System2.INSTANCE.now()); int deletedCount = underTest.deleteByUuid(db.getSession(), "UNKNOWN", null); assertThat(deletedCount).isZero(); @@ -286,6 +286,31 @@ public class CeQueueDaoIT { assertThat(underTest.selectByUuid(db.getSession(), TASK_UUID_2)).isEmpty(); } + @Test + public void selectNotPendingForWorker_return_non_pending_tasks_for_specified_workerUuid() { + long startedAt = alwaysIncreasingSystem2.now(); + insertPending("u1"); + CeQueueDto inProgressTaskWorker1 = insertInProgress("u2", WORKER_UUID_1, startedAt); + insertInProgress("o2", WORKER_UUID_2, startedAt); + + List notPendingForWorker = underTestAlwaysIncreasingSystem2.selectNotPendingForWorker(db.getSession(), WORKER_UUID_1); + + assertThat(notPendingForWorker).extracting(CeQueueDto::getUuid) + .contains(inProgressTaskWorker1.getUuid()); + } + + @Test + public void resetToPendingByUuid_resets_status_of_specific_task() { + long task1startedAt = alwaysIncreasingSystem2.now(); + CeQueueDto task1 = insertInProgress("uuid-1", "workerUuid", task1startedAt); + CeQueueDto task2 = insertInProgress("uuid-2", "workerUuid", alwaysIncreasingSystem2.now()); + + underTestAlwaysIncreasingSystem2.resetToPendingByUuid(db.getSession(), task1.getUuid()); + + verifyResetToPendingForWorker(task1, task1.getWorkerUuid(), task1startedAt); + verifyUnchangedByResetToPendingForWorker(task2); + } + @Test public void resetToPendingForWorker_resets_status_of_non_pending_tasks_only_for_specified_workerUuid() { CeQueueDto[] worker1 = {insertPending("u1"), insertPending("u2"), insertPending("u3"), insertPending("u4")}; @@ -296,7 +321,10 @@ public class CeQueueDaoIT { makeInProgress(WORKER_UUID_2, startedAt, worker2[0]); makeInProgress(WORKER_UUID_2, startedAt, worker2[3]); - underTestAlwaysIncreasingSystem2.resetToPendingForWorker(db.getSession(), WORKER_UUID_1); + List notPendingForWorker = underTestAlwaysIncreasingSystem2.selectNotPendingForWorker(db.getSession(), WORKER_UUID_1); + for (CeQueueDto ceQueueDto : notPendingForWorker) { + underTestAlwaysIncreasingSystem2.resetToPendingByUuid(db.getSession(), ceQueueDto.getUuid()); + } verifyResetToPendingForWorker(worker1[0], WORKER_UUID_1, startedAt); verifyUnchangedByResetToPendingForWorker(worker1[1]); @@ -739,10 +767,10 @@ public class CeQueueDaoIT { return dto; } - private CeQueueDto insertInProgress(String uuid) { - CeQueueDto ceQueueDto = insertPending(uuid); - CeQueueTesting.makeInProgress(db.getSession(), "workerUuid", System2.INSTANCE.now(), ceQueueDto); - return underTest.selectByUuid(db.getSession(), uuid).get(); + private CeQueueDto insertInProgress(String taskUuid, String workerUuid, long now) { + CeQueueDto ceQueueDto = insertPending(taskUuid); + CeQueueTesting.makeInProgress(db.getSession(), workerUuid, now, ceQueueDto); + return underTest.selectByUuid(db.getSession(), taskUuid).get(); } private void insertCharacteristic(String key, String value, String uuid, String taskUuid) { diff --git a/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueDao.java b/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueDao.java index 6407e45327a..77cf830d594 100644 --- a/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueDao.java +++ b/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueDao.java @@ -133,12 +133,12 @@ public class CeQueueDao implements Dao { return mapper(session).deleteByUuid(uuid, deleteIf); } - /** - * Updates all tasks for the specified worker uuid which are not PENDING to: - * STATUS='PENDING', STARTED_AT=NULL, UPDATED_AT={now}. - */ - public int resetToPendingForWorker(DbSession session, String workerUuid) { - return mapper(session).resetToPendingForWorker(workerUuid, system2.now()); + public void resetToPendingByUuid(DbSession session, String uuid) { + mapper(session).resetToPendingByUuid(uuid, system2.now()); + } + + public List selectNotPendingForWorker(DbSession session, String uuid) { + return mapper(session).selectNotPendingForWorker(uuid); } public int countByStatus(DbSession dbSession, CeQueueDto.Status status) { diff --git a/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueMapper.java b/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueMapper.java index 52871a46454..4b991dfcbdb 100644 --- a/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueMapper.java +++ b/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueMapper.java @@ -80,7 +80,9 @@ public interface CeQueueMapper { void insert(CeQueueDto dto); - int resetToPendingForWorker(@Param("workerUuid") String workerUuid, @Param("updatedAt") long updatedAt); + List selectNotPendingForWorker(@Param("workerUuid") String workerUuid); + + void resetToPendingByUuid(@Param("uuid") String uuid, @Param("updatedAt") long updatedAt); int updateIf(@Param("uuid") String uuid, @Param("new") UpdateIf.NewProperties newProperties, diff --git a/server/sonar-db-dao/src/main/resources/org/sonar/db/ce/CeQueueMapper.xml b/server/sonar-db-dao/src/main/resources/org/sonar/db/ce/CeQueueMapper.xml index 0e33cabba68..550aa6789b4 100644 --- a/server/sonar-db-dao/src/main/resources/org/sonar/db/ce/CeQueueMapper.xml +++ b/server/sonar-db-dao/src/main/resources/org/sonar/db/ce/CeQueueMapper.xml @@ -258,13 +258,22 @@ ) - + + + update ce_queue set status='PENDING', updated_at=#{updatedAt,jdbcType=BIGINT} where - status <> 'PENDING' - and worker_uuid = #{workerUuid,jdbcType=VARCHAR} + uuid = #{uuid,jdbcType=VARCHAR} -- 2.39.5