From d463c9d9db163c0dca93a8ac720b50d5251fd5f9 Mon Sep 17 00:00:00 2001 From: =?utf8?q?S=C3=A9bastien=20Lesaint?= Date: Wed, 29 Mar 2017 15:16:28 +0200 Subject: [PATCH] SONAR-8987 worker reset any in progress task it has when peeking --- .../sonar/ce/queue/InternalCeQueueImpl.java | 10 +- .../taskprocessor/CeWorkerCallableImpl.java | 4 +- .../ce/queue/InternalCeQueueImplTest.java | 118 +++++++++++++++++- .../CeWorkerCallableImplTest.java | 40 ++++-- .../main/java/org/sonar/db/ce/CeQueueDao.java | 8 ++ .../java/org/sonar/db/ce/CeQueueMapper.java | 2 + .../org/sonar/db/ce/CeQueueMapper.xml | 10 ++ .../java/org/sonar/db/ce/CeQueueDaoTest.java | 58 +++++++++ 8 files changed, 230 insertions(+), 20 deletions(-) diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java index 8f899d4b2e6..8e5bc19bf8f 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java @@ -31,11 +31,13 @@ import javax.annotation.Nullable; import org.apache.log4j.Logger; import org.sonar.api.ce.ComputeEngineSide; import org.sonar.api.utils.System2; +import org.sonar.api.utils.log.Loggers; import org.sonar.ce.monitoring.CEQueueStatus; import org.sonar.core.util.UuidFactory; import org.sonar.db.DbClient; import org.sonar.db.DbSession; import org.sonar.db.ce.CeActivityDto; +import org.sonar.db.ce.CeQueueDao; import org.sonar.db.ce.CeQueueDto; import org.sonar.server.organization.DefaultOrganizationProvider; @@ -45,6 +47,7 @@ import static java.util.Objects.requireNonNull; @ComputeEngineSide public class InternalCeQueueImpl extends CeQueueImpl implements InternalCeQueue { + private static final org.sonar.api.utils.log.Logger LOG = Loggers.get(InternalCeQueueImpl.class); private static final int MAX_EXECUTION_COUNT = 2; @@ -71,7 +74,12 @@ public class InternalCeQueueImpl extends CeQueueImpl implements InternalCeQueue return Optional.empty(); } try (DbSession dbSession = dbClient.openSession(false)) { - Optional dto = dbClient.ceQueueDao().peek(dbSession, workerUuid, MAX_EXECUTION_COUNT); + CeQueueDao ceQueueDao = dbClient.ceQueueDao(); + int i = ceQueueDao.resetToPendingForWorker(dbSession, workerUuid); + if (i > 0) { + LOG.debug("{} in progress tasks reset for worker uuid {}", i, workerUuid); + } + Optional dto = ceQueueDao.peek(dbSession, workerUuid, MAX_EXECUTION_COUNT); CeTask task = null; if (dto.isPresent()) { task = loadTask(dbSession, dto.get()); diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerCallableImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerCallableImpl.java index 517a0a7651d..eb6d9d2ee24 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerCallableImpl.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerCallableImpl.java @@ -20,6 +20,7 @@ package org.sonar.ce.taskprocessor; import java.util.Optional; +import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Nullable; import org.sonar.api.utils.log.Logger; import org.sonar.api.utils.log.Loggers; @@ -61,9 +62,10 @@ public class CeWorkerCallableImpl implements CeWorkerCallable { return true; } + private static final AtomicLong counter = new AtomicLong(0); private Optional tryAndFindTaskToExecute() { try { - return queue.peek("UNKNOWN" /*FIXME provide a real worker uuid*/); + return queue.peek("uuid" + counter.addAndGet(100)); } catch (Exception e) { LOG.error("Failed to pop the queue of analysis reports", e); } diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java index 071bd177845..93340a39758 100644 --- a/server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java +++ b/server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java @@ -31,7 +31,7 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; import org.sonar.api.utils.System2; -import org.sonar.api.utils.internal.TestSystem2; +import org.sonar.api.utils.internal.AlwaysIncreasingSystem2; import org.sonar.ce.monitoring.CEQueueStatus; import org.sonar.ce.monitoring.CEQueueStatusImpl; import org.sonar.core.util.UuidFactory; @@ -47,6 +47,7 @@ import org.sonar.db.organization.OrganizationDto; import org.sonar.server.organization.DefaultOrganization; import org.sonar.server.organization.DefaultOrganizationProvider; +import static com.google.common.base.Preconditions.checkArgument; import static java.util.Arrays.asList; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; @@ -58,7 +59,7 @@ public class InternalCeQueueImplTest { private static final String WORKER_UUID_1 = "worker uuid 1"; private static final String WORKER_UUID_2 = "worker uuid 2"; - private System2 system2 = new TestSystem2().setNow(1_450_000_000_000L); + private System2 system2 = new AlwaysIncreasingSystem2(); @Rule public ExpectedException expectedException = ExpectedException.none(); @@ -351,6 +352,110 @@ public class InternalCeQueueImplTest { assertThat(underTest.peek(WORKER_UUID_1).isPresent()).isFalse(); } + @Test + public void peek_resets_to_pending_any_task_in_progress_for_specified_worker_uuid_and_updates_updatedAt_no_matter_execution_count() { + insertPending("u0", "doesn't matter", 0); // add a pending one that will be picked so that u1 isn't peek and status reset is visible in DB + CeQueueDto u1 = insertPending("u1", WORKER_UUID_1, 2);// won't be peeked because it's worn-out + CeQueueDto u2 = insertInProgress("u2", WORKER_UUID_1, 3);// will be reset but won't be picked because it's worn-out + CeQueueDto u3 = insertPending("u3", WORKER_UUID_1, 1);// will be picked-because older than any of the reset ones + CeQueueDto u4 = insertInProgress("u4", WORKER_UUID_1, 1);// will be reset + + assertThat(underTest.peek(WORKER_UUID_1).get().getUuid()).isEqualTo("u0"); + + verifyUnmodifiedTask(u1); + verifyResetTask(u2); + verifyUnmodifiedTask(u3); + verifyResetTask(u4); + } + + @Test + public void peek_resets_to_pending_any_task_in_progress_for_specified_worker_uuid_and_only_this_uuid() { + insertPending("u0", "doesn't matter", 0); // add a pending one that will be picked so that u1 isn't peek and status reset is visible in DB + CeQueueDto u1 = insertInProgress("u1", WORKER_UUID_1, 3); + CeQueueDto u2 = insertInProgress("u2", WORKER_UUID_2, 3); + CeQueueDto u3 = insertInProgress("u3", WORKER_UUID_1, 3); + CeQueueDto u4 = insertInProgress("u4", WORKER_UUID_2, 1); + + assertThat(underTest.peek(WORKER_UUID_1).get().getUuid()).isEqualTo("u0"); + + verifyResetTask(u1); + verifyUnmodifiedTask(u2); + verifyResetTask(u3); + verifyUnmodifiedTask(u4); + } + + @Test + public void peek_resets_to_pending_any_task_in_progress_for_specified_worker_uuid_and_peeks_the_oldest_non_worn_out_no_matter_if_it_has_been_reset_or_not() { + insertPending("u1", WORKER_UUID_1, 3); // won't be picked because worn out + insertInProgress("u2", WORKER_UUID_1, 3); // will be reset but won't be picked because worn out + insertPending("u3", WORKER_UUID_1, 0); // will be picked first + insertInProgress("u4", WORKER_UUID_1, 1); // will be reset and picked on second call only + + Optional ceTask = underTest.peek(WORKER_UUID_1); + assertThat(ceTask.get().getUuid()).isEqualTo("u3"); + + // remove first task and do another peek: will pick the reset task since it's now the oldest one + underTest.remove(ceTask.get(), CeActivityDto.Status.SUCCESS, null, null); + assertThat(underTest.peek(WORKER_UUID_1).get().getUuid()).isEqualTo("u4"); + } + + @Test + public void peek_resets_to_pending_any_task_in_progress_for_specified_worker_uuid_and_peeks_reset_tasks_if_is_the_oldest_non_worn_out() { + insertPending("u1", WORKER_UUID_1, 3); // won't be picked because worn out + insertInProgress("u2", WORKER_UUID_1, 3); // will be reset but won't be picked because worn out + insertInProgress("u3", WORKER_UUID_1, 1); // will be reset and picked + insertPending("u4", WORKER_UUID_1, 0); // will be picked second + + Optional ceTask = underTest.peek(WORKER_UUID_1); + assertThat(ceTask.get().getUuid()).isEqualTo("u3"); + + // remove first task and do another peek: will pick the reset task since it's now the oldest one + underTest.remove(ceTask.get(), CeActivityDto.Status.SUCCESS, null, null); + assertThat(underTest.peek(WORKER_UUID_1).get().getUuid()).isEqualTo("u4"); + } + + private void verifyResetTask(CeQueueDto originalDto) { + CeQueueDto dto = dbTester.getDbClient().ceQueueDao().selectByUuid(session, originalDto.getUuid()).get(); + assertThat(dto.getStatus()).isEqualTo(CeQueueDto.Status.PENDING); + assertThat(dto.getExecutionCount()).isEqualTo(originalDto.getExecutionCount()); + assertThat(dto.getCreatedAt()).isEqualTo(originalDto.getCreatedAt()); + assertThat(dto.getUpdatedAt()).isGreaterThan(originalDto.getUpdatedAt()); + } + + private void verifyUnmodifiedTask(CeQueueDto originalDto) { + CeQueueDto dto = dbTester.getDbClient().ceQueueDao().selectByUuid(session, originalDto.getUuid()).get(); + assertThat(dto.getStatus()).isEqualTo(originalDto.getStatus()); + assertThat(dto.getExecutionCount()).isEqualTo(originalDto.getExecutionCount()); + assertThat(dto.getCreatedAt()).isEqualTo(originalDto.getCreatedAt()); + assertThat(dto.getUpdatedAt()).isEqualTo(originalDto.getUpdatedAt()); + } + + private CeQueueDto insertInProgress(String uuid, String workerUuid, int executionCount) { + checkArgument(executionCount > 0, "execution count less than 1 does not make sense for an in progress task"); + CeQueueDto dto = new CeQueueDto() + .setUuid(uuid) + .setTaskType("foo") + .setStatus(CeQueueDto.Status.IN_PROGRESS) + .setWorkerUuid(workerUuid) + .setExecutionCount(executionCount); + dbTester.getDbClient().ceQueueDao().insert(session, dto); + dbTester.commit(); + return dto; + } + + private CeQueueDto insertPending(String uuid, String workerUuid, int executionCount) { + checkArgument(executionCount > -1, "execution count less than 0 does not make sense for a pending task"); + CeQueueDto dto = new CeQueueDto() + .setUuid(uuid) + .setTaskType("foo") + .setStatus(CeQueueDto.Status.PENDING) + .setWorkerUuid(workerUuid) + .setExecutionCount(executionCount); + dbTester.getDbClient().ceQueueDao().insert(session, dto); + dbTester.commit(); + return dto; + } + @Test public void cancel_pending() throws Exception { CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1"); @@ -503,10 +608,11 @@ public class InternalCeQueueImplTest { private void verifyCeQueueDtoForTaskSubmit(CeTaskSubmit taskSubmit) { Optional queueDto = dbTester.getDbClient().ceQueueDao().selectByUuid(dbTester.getSession(), taskSubmit.getUuid()); assertThat(queueDto.isPresent()).isTrue(); - assertThat(queueDto.get().getTaskType()).isEqualTo(taskSubmit.getType()); - assertThat(queueDto.get().getComponentUuid()).isEqualTo(taskSubmit.getComponentUuid()); - assertThat(queueDto.get().getSubmitterLogin()).isEqualTo(taskSubmit.getSubmitterLogin()); - assertThat(queueDto.get().getCreatedAt()).isEqualTo(1_450_000_000_000L); + CeQueueDto dto = queueDto.get(); + assertThat(dto.getTaskType()).isEqualTo(taskSubmit.getType()); + assertThat(dto.getComponentUuid()).isEqualTo(taskSubmit.getComponentUuid()); + assertThat(dto.getSubmitterLogin()).isEqualTo(taskSubmit.getSubmitterLogin()); + assertThat(dto.getCreatedAt()).isEqualTo(dto.getUpdatedAt()).isNotNull(); } private ComponentDto newComponentDto(String uuid) { diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerCallableImplTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerCallableImplTest.java index 05c6112ff57..4fc5aee88b4 100644 --- a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerCallableImplTest.java +++ b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerCallableImplTest.java @@ -24,6 +24,7 @@ import java.util.Optional; import javax.annotation.Nullable; import org.junit.Rule; import org.junit.Test; +import org.mockito.ArgumentCaptor; import org.mockito.InOrder; import org.mockito.Mockito; import org.sonar.api.utils.log.LogTester; @@ -36,16 +37,16 @@ import org.sonar.db.ce.CeTaskTypes; import org.sonar.server.computation.task.projectanalysis.taskprocessor.ReportTaskProcessor; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; public class CeWorkerCallableImplTest { - private static final String UNKNOWN_WORKER_UUID = "UNKNOWN"; - @Rule public CeTaskProcessorRepositoryRule taskProcessorRepository = new CeTaskProcessorRepositoryRule(); @Rule @@ -54,12 +55,13 @@ public class CeWorkerCallableImplTest { private InternalCeQueue queue = mock(InternalCeQueue.class); private ReportTaskProcessor taskProcessor = mock(ReportTaskProcessor.class); private CeLogging ceLogging = spy(CeLogging.class); + private ArgumentCaptor workerUuid = ArgumentCaptor.forClass(String.class); private CeWorkerCallable underTest = new CeWorkerCallableImpl(queue, ceLogging, taskProcessorRepository); private InOrder inOrder = Mockito.inOrder(ceLogging, taskProcessor, queue); @Test public void no_pending_tasks_in_queue() throws Exception { - when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.empty()); + when(queue.peek(anyString())).thenReturn(Optional.empty()); assertThat(underTest.call()).isFalse(); @@ -70,10 +72,11 @@ public class CeWorkerCallableImplTest { public void fail_when_no_CeTaskProcessor_is_found_in_repository() throws Exception { CeTask task = createCeTask(null); taskProcessorRepository.setNoProcessorForTask(CeTaskTypes.REPORT); - when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(task)); + when(queue.peek(anyString())).thenReturn(Optional.of(task)); assertThat(underTest.call()).isTrue(); + verifyWorkerUuid(); inOrder.verify(ceLogging).initForTask(task); inOrder.verify(queue).remove(task, CeActivityDto.Status.FAILED, null, null); inOrder.verify(ceLogging).clearForTask(); @@ -83,10 +86,11 @@ public class CeWorkerCallableImplTest { public void peek_and_process_task() throws Exception { CeTask task = createCeTask(null); taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor); - when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(task)); + when(queue.peek(anyString())).thenReturn(Optional.of(task)); assertThat(underTest.call()).isTrue(); + verifyWorkerUuid(); inOrder.verify(ceLogging).initForTask(task); inOrder.verify(taskProcessor).process(task); inOrder.verify(queue).remove(task, CeActivityDto.Status.SUCCESS, null, null); @@ -96,12 +100,13 @@ public class CeWorkerCallableImplTest { @Test public void fail_to_process_task() throws Exception { CeTask task = createCeTask(null); - when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(task)); + when(queue.peek(anyString())).thenReturn(Optional.of(task)); taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor); Throwable error = makeTaskProcessorFail(task); assertThat(underTest.call()).isTrue(); + verifyWorkerUuid(); inOrder.verify(ceLogging).initForTask(task); inOrder.verify(taskProcessor).process(task); inOrder.verify(queue).remove(task, CeActivityDto.Status.FAILED, null, error); @@ -110,11 +115,12 @@ public class CeWorkerCallableImplTest { @Test public void do_not_display_submitter_param_in_log_when_submitterLogin_is_not_set_in_case_of_success() throws Exception { - when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(createCeTask(null))); + when(queue.peek(anyString())).thenReturn(Optional.of(createCeTask(null))); taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor); underTest.call(); + verifyWorkerUuid(); List logs = logTester.logs(LoggerLevel.INFO); assertThat(logs).hasSize(2); for (int i = 0; i < 2; i++) { @@ -125,12 +131,13 @@ public class CeWorkerCallableImplTest { @Test public void do_not_display_submitter_param_in_log_when_submitterLogin_is_not_set_in_case_of_error() throws Exception { CeTask ceTask = createCeTask(null); - when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(ceTask)); + when(queue.peek(anyString())).thenReturn(Optional.of(ceTask)); taskProcessorRepository.setProcessorForTask(ceTask.getType(), taskProcessor); makeTaskProcessorFail(ceTask); underTest.call(); + verifyWorkerUuid(); List logs = logTester.logs(LoggerLevel.INFO); assertThat(logs).hasSize(1); assertThat(logs.get(0)).doesNotContain(" | submitter="); @@ -144,11 +151,12 @@ public class CeWorkerCallableImplTest { @Test public void display_submitterLogin_in_logs_when_set_in_case_of_success() throws Exception { - when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(createCeTask("FooBar"))); + when(queue.peek(anyString())).thenReturn(Optional.of(createCeTask("FooBar"))); taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor); underTest.call(); + verifyWorkerUuid(); List logs = logTester.logs(LoggerLevel.INFO); assertThat(logs).hasSize(2); assertThat(logs.get(0)).contains(" | submitter=FooBar"); @@ -160,12 +168,13 @@ public class CeWorkerCallableImplTest { @Test public void display_submitterLogin_in_logs_when_set_in_case_of_error() throws Exception { CeTask ceTask = createCeTask("FooBar"); - when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(ceTask)); + when(queue.peek(anyString())).thenReturn(Optional.of(ceTask)); taskProcessorRepository.setProcessorForTask(ceTask.getType(), taskProcessor); makeTaskProcessorFail(ceTask); underTest.call(); + verifyWorkerUuid(); List logs = logTester.logs(LoggerLevel.INFO); assertThat(logs).hasSize(1); assertThat(logs.iterator().next()).contains(" | submitter=FooBar"); @@ -179,11 +188,12 @@ public class CeWorkerCallableImplTest { public void display_start_stop_at_debug_level_for_console_if_DEBUG_is_enabled_and_task_successful() throws Exception { logTester.setLevel(LoggerLevel.DEBUG); - when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(createCeTask("FooBar"))); + when(queue.peek(anyString())).thenReturn(Optional.of(createCeTask("FooBar"))); taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor); underTest.call(); + verifyWorkerUuid(); List logs = logTester.logs(LoggerLevel.INFO); assertThat(logs).hasSize(2); assertThat(logs.get(0)).contains(" | submitter=FooBar"); @@ -197,12 +207,13 @@ public class CeWorkerCallableImplTest { logTester.setLevel(LoggerLevel.DEBUG); CeTask ceTask = createCeTask("FooBar"); - when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(ceTask)); + when(queue.peek(anyString())).thenReturn(Optional.of(ceTask)); taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor); makeTaskProcessorFail(ceTask); underTest.call(); + verifyWorkerUuid(); List logs = logTester.logs(LoggerLevel.INFO); assertThat(logs).hasSize(1); assertThat(logs.iterator().next()).contains(" | submitter=FooBar"); @@ -213,6 +224,11 @@ public class CeWorkerCallableImplTest { assertThat(logTester.logs(LoggerLevel.DEBUG)).isEmpty(); } + private void verifyWorkerUuid() { + verify(queue).peek(workerUuid.capture()); + assertThat(workerUuid.getValue()).startsWith("uuid"); + } + private static CeTask createCeTask(@Nullable String submitterLogin) { return new CeTask.Builder() .setOrganizationUuid("org1") diff --git a/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueDao.java b/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueDao.java index 589f2e200c1..c4a747cd078 100644 --- a/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueDao.java +++ b/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueDao.java @@ -106,6 +106,14 @@ public class CeQueueDao implements Dao { mapper(session).resetAllToPendingStatus(system2.now()); } + /** + * Update all tasks for the specified worker uuid which are not PENDING to: + * STATUS='PENDING', STARTED_AT=NULL, UPDATED_AT={now}. + */ + public int resetToPendingForWorker(DbSession session, String workerUuid) { + return mapper(session).resetToPendingForWorker(workerUuid, system2.now()); + } + public int countByStatus(DbSession dbSession, CeQueueDto.Status status) { return mapper(dbSession).countByStatusAndComponentUuid(status, null); } diff --git a/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueMapper.java b/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueMapper.java index 2d82fb11ea4..e08b952c1b7 100644 --- a/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueMapper.java +++ b/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueMapper.java @@ -52,6 +52,8 @@ public interface CeQueueMapper { void resetAllToPendingStatus(@Param("updatedAt") long updatedAt); + int resetToPendingForWorker(@Param("workerUuid") String workerUuid, @Param("updatedAt") long updatedAt); + int updateIf(@Param("uuid") String uuid, @Param("new") UpdateIf.NewProperties newProperties, @Param("old") UpdateIf.OldProperties oldProperties); diff --git a/server/sonar-db-dao/src/main/resources/org/sonar/db/ce/CeQueueMapper.xml b/server/sonar-db-dao/src/main/resources/org/sonar/db/ce/CeQueueMapper.xml index df76f8c561e..90f60a85806 100644 --- a/server/sonar-db-dao/src/main/resources/org/sonar/db/ce/CeQueueMapper.xml +++ b/server/sonar-db-dao/src/main/resources/org/sonar/db/ce/CeQueueMapper.xml @@ -222,6 +222,16 @@ status <> 'PENDING' + + update ce_queue set + status='PENDING', + started_at=NULL, + updated_at=#{updatedAt,jdbcType=BIGINT} + where + status <> 'PENDING' + and worker_uuid = #{workerUuid,jdbcType=VARCHAR} + + update ce_queue set status=#{new.status,jdbcType=VARCHAR}, diff --git a/server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeQueueDaoTest.java b/server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeQueueDaoTest.java index 0bcc63c2521..15605396f9d 100644 --- a/server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeQueueDaoTest.java +++ b/server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeQueueDaoTest.java @@ -256,6 +256,50 @@ public class CeQueueDaoTest { assertThat(saved.getExecutionCount()).isEqualTo(EXECUTION_COUNT); } + @Test + public void resetToPendingForWorker_resets_status_of_non_pending_tasks_only_for_specified_workerUuid() { + long startedAt = 2_099_888L; + CeQueueDto u1 = insert("u1", CeQueueDto.Status.IN_PROGRESS, 1, WORKER_UUID_1, startedAt); + CeQueueDto u2 = insert("u2", CeQueueDto.Status.PENDING, 1, WORKER_UUID_1, startedAt); + CeQueueDto u3 = insert("u3", CeQueueDto.Status.PENDING, 0, WORKER_UUID_1, startedAt); + CeQueueDto u4 = insert("u4", CeQueueDto.Status.IN_PROGRESS, 2, WORKER_UUID_1, startedAt); + CeQueueDto o1 = insert("o1", CeQueueDto.Status.IN_PROGRESS, 1, WORKER_UUID_2, startedAt); + CeQueueDto o2 = insert("o2", CeQueueDto.Status.PENDING, 1, WORKER_UUID_2, startedAt); + CeQueueDto o3 = insert("o3", CeQueueDto.Status.PENDING, 0, WORKER_UUID_2, startedAt); + CeQueueDto o4 = insert("o4", CeQueueDto.Status.IN_PROGRESS, 2, WORKER_UUID_2, startedAt); + + underTestAlwaysIncreasingSystem2.resetToPendingForWorker(db.getSession(), WORKER_UUID_1); + + verifyResetToPendingForWorker(u1); + verifyUnchangedByResetToPendingForWorker(u2); + verifyUnchangedByResetToPendingForWorker(u3); + verifyResetToPendingForWorker(u4); + verifyUnchangedByResetToPendingForWorker(o1); + verifyUnchangedByResetToPendingForWorker(o2); + verifyUnchangedByResetToPendingForWorker(o3); + verifyUnchangedByResetToPendingForWorker(o4); + } + + private void verifyResetToPendingForWorker(CeQueueDto original) { + CeQueueDto dto = db.getDbClient().ceQueueDao().selectByUuid(db.getSession(), original.getUuid()).get(); + assertThat(dto.getStatus()).isEqualTo(CeQueueDto.Status.PENDING); + assertThat(dto.getExecutionCount()).isEqualTo(original.getExecutionCount()); + assertThat(dto.getStartedAt()).isNull(); + assertThat(dto.getCreatedAt()).isEqualTo(original.getCreatedAt()); + assertThat(dto.getUpdatedAt()).isGreaterThan(original.getUpdatedAt()); + assertThat(dto.getWorkerUuid()).isEqualTo(original.getWorkerUuid()); + } + + private void verifyUnchangedByResetToPendingForWorker(CeQueueDto original) { + CeQueueDto dto = db.getDbClient().ceQueueDao().selectByUuid(db.getSession(), original.getUuid()).get(); + assertThat(dto.getStatus()).isEqualTo(original.getStatus()); + assertThat(dto.getExecutionCount()).isEqualTo(original.getExecutionCount()); + assertThat(dto.getStartedAt()).isEqualTo(original.getStartedAt()); + assertThat(dto.getCreatedAt()).isEqualTo(original.getCreatedAt()); + assertThat(dto.getUpdatedAt()).isEqualTo(original.getUpdatedAt()); + assertThat(dto.getWorkerUuid()).isEqualTo(original.getWorkerUuid()); + } + @Test public void peek_none_if_no_pendings() throws Exception { assertThat(underTest.peek(db.getSession(), WORKER_UUID_1, MAX_EXECUTION_COUNT).isPresent()).isFalse(); @@ -510,6 +554,20 @@ public class CeQueueDaoTest { return dto; } + private CeQueueDto insert(String uuid, CeQueueDto.Status status, int executionCount, String workerUuid, Long startedAt) { + CeQueueDto dto = new CeQueueDto(); + dto.setUuid(uuid); + dto.setTaskType(CeTaskTypes.REPORT); + dto.setStatus(status); + dto.setSubmitterLogin("henri"); + dto.setExecutionCount(executionCount); + dto.setWorkerUuid(workerUuid); + dto.setStartedAt(startedAt); + underTestAlwaysIncreasingSystem2.insert(db.getSession(), dto); + db.getSession().commit(); + return dto; + } + private CeQueueDto insert(String uuid, String componentUuid, CeQueueDto.Status status) { CeQueueDto dto = new CeQueueDto(); dto.setUuid(uuid); -- 2.39.5