diff options
author | Sébastien Lesaint <sebastien.lesaint@sonarsource.com> | 2017-03-29 15:16:28 +0200 |
---|---|---|
committer | Eric Hartmann <hartmann.eric@gmail.Com> | 2017-04-27 09:23:18 +0200 |
commit | d463c9d9db163c0dca93a8ac720b50d5251fd5f9 (patch) | |
tree | 6993e6cce5b3722b4e795ffb6955b4f70c8022ff /server/sonar-ce | |
parent | 5c659e207f045706c4408cc9ae72c56a71e545d9 (diff) | |
download | sonarqube-d463c9d9db163c0dca93a8ac720b50d5251fd5f9.tar.gz sonarqube-d463c9d9db163c0dca93a8ac720b50d5251fd5f9.zip |
SONAR-8987 worker reset any in progress task it has when peeking
Diffstat (limited to 'server/sonar-ce')
4 files changed, 152 insertions, 20 deletions
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java index 8f899d4b2e6..8e5bc19bf8f 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java @@ -31,11 +31,13 @@ import javax.annotation.Nullable; import org.apache.log4j.Logger; import org.sonar.api.ce.ComputeEngineSide; import org.sonar.api.utils.System2; +import org.sonar.api.utils.log.Loggers; import org.sonar.ce.monitoring.CEQueueStatus; import org.sonar.core.util.UuidFactory; import org.sonar.db.DbClient; import org.sonar.db.DbSession; import org.sonar.db.ce.CeActivityDto; +import org.sonar.db.ce.CeQueueDao; import org.sonar.db.ce.CeQueueDto; import org.sonar.server.organization.DefaultOrganizationProvider; @@ -45,6 +47,7 @@ import static java.util.Objects.requireNonNull; @ComputeEngineSide public class InternalCeQueueImpl extends CeQueueImpl implements InternalCeQueue { + private static final org.sonar.api.utils.log.Logger LOG = Loggers.get(InternalCeQueueImpl.class); private static final int MAX_EXECUTION_COUNT = 2; @@ -71,7 +74,12 @@ public class InternalCeQueueImpl extends CeQueueImpl implements InternalCeQueue return Optional.empty(); } try (DbSession dbSession = dbClient.openSession(false)) { - Optional<CeQueueDto> dto = dbClient.ceQueueDao().peek(dbSession, workerUuid, MAX_EXECUTION_COUNT); + CeQueueDao ceQueueDao = dbClient.ceQueueDao(); + int i = ceQueueDao.resetToPendingForWorker(dbSession, workerUuid); + if (i > 0) { + LOG.debug("{} in progress tasks reset for worker uuid {}", i, workerUuid); + } + Optional<CeQueueDto> dto = ceQueueDao.peek(dbSession, workerUuid, MAX_EXECUTION_COUNT); CeTask task = null; if (dto.isPresent()) { task = loadTask(dbSession, dto.get()); diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerCallableImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerCallableImpl.java index 517a0a7651d..eb6d9d2ee24 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerCallableImpl.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerCallableImpl.java @@ -20,6 +20,7 @@ package org.sonar.ce.taskprocessor; import java.util.Optional; +import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Nullable; import org.sonar.api.utils.log.Logger; import org.sonar.api.utils.log.Loggers; @@ -61,9 +62,10 @@ public class CeWorkerCallableImpl implements CeWorkerCallable { return true; } + private static final AtomicLong counter = new AtomicLong(0); private Optional<CeTask> tryAndFindTaskToExecute() { try { - return queue.peek("UNKNOWN" /*FIXME provide a real worker uuid*/); + return queue.peek("uuid" + counter.addAndGet(100)); } catch (Exception e) { LOG.error("Failed to pop the queue of analysis reports", e); } diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java index 071bd177845..93340a39758 100644 --- a/server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java +++ b/server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java @@ -31,7 +31,7 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; import org.sonar.api.utils.System2; -import org.sonar.api.utils.internal.TestSystem2; +import org.sonar.api.utils.internal.AlwaysIncreasingSystem2; import org.sonar.ce.monitoring.CEQueueStatus; import org.sonar.ce.monitoring.CEQueueStatusImpl; import org.sonar.core.util.UuidFactory; @@ -47,6 +47,7 @@ import org.sonar.db.organization.OrganizationDto; import org.sonar.server.organization.DefaultOrganization; import org.sonar.server.organization.DefaultOrganizationProvider; +import static com.google.common.base.Preconditions.checkArgument; import static java.util.Arrays.asList; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; @@ -58,7 +59,7 @@ public class InternalCeQueueImplTest { private static final String WORKER_UUID_1 = "worker uuid 1"; private static final String WORKER_UUID_2 = "worker uuid 2"; - private System2 system2 = new TestSystem2().setNow(1_450_000_000_000L); + private System2 system2 = new AlwaysIncreasingSystem2(); @Rule public ExpectedException expectedException = ExpectedException.none(); @@ -352,6 +353,110 @@ public class InternalCeQueueImplTest { } @Test + public void peek_resets_to_pending_any_task_in_progress_for_specified_worker_uuid_and_updates_updatedAt_no_matter_execution_count() { + insertPending("u0", "doesn't matter", 0); // add a pending one that will be picked so that u1 isn't peek and status reset is visible in DB + CeQueueDto u1 = insertPending("u1", WORKER_UUID_1, 2);// won't be peeked because it's worn-out + CeQueueDto u2 = insertInProgress("u2", WORKER_UUID_1, 3);// will be reset but won't be picked because it's worn-out + CeQueueDto u3 = insertPending("u3", WORKER_UUID_1, 1);// will be picked-because older than any of the reset ones + CeQueueDto u4 = insertInProgress("u4", WORKER_UUID_1, 1);// will be reset + + assertThat(underTest.peek(WORKER_UUID_1).get().getUuid()).isEqualTo("u0"); + + verifyUnmodifiedTask(u1); + verifyResetTask(u2); + verifyUnmodifiedTask(u3); + verifyResetTask(u4); + } + + @Test + public void peek_resets_to_pending_any_task_in_progress_for_specified_worker_uuid_and_only_this_uuid() { + insertPending("u0", "doesn't matter", 0); // add a pending one that will be picked so that u1 isn't peek and status reset is visible in DB + CeQueueDto u1 = insertInProgress("u1", WORKER_UUID_1, 3); + CeQueueDto u2 = insertInProgress("u2", WORKER_UUID_2, 3); + CeQueueDto u3 = insertInProgress("u3", WORKER_UUID_1, 3); + CeQueueDto u4 = insertInProgress("u4", WORKER_UUID_2, 1); + + assertThat(underTest.peek(WORKER_UUID_1).get().getUuid()).isEqualTo("u0"); + + verifyResetTask(u1); + verifyUnmodifiedTask(u2); + verifyResetTask(u3); + verifyUnmodifiedTask(u4); + } + + @Test + public void peek_resets_to_pending_any_task_in_progress_for_specified_worker_uuid_and_peeks_the_oldest_non_worn_out_no_matter_if_it_has_been_reset_or_not() { + insertPending("u1", WORKER_UUID_1, 3); // won't be picked because worn out + insertInProgress("u2", WORKER_UUID_1, 3); // will be reset but won't be picked because worn out + insertPending("u3", WORKER_UUID_1, 0); // will be picked first + insertInProgress("u4", WORKER_UUID_1, 1); // will be reset and picked on second call only + + Optional<CeTask> ceTask = underTest.peek(WORKER_UUID_1); + assertThat(ceTask.get().getUuid()).isEqualTo("u3"); + + // remove first task and do another peek: will pick the reset task since it's now the oldest one + underTest.remove(ceTask.get(), CeActivityDto.Status.SUCCESS, null, null); + assertThat(underTest.peek(WORKER_UUID_1).get().getUuid()).isEqualTo("u4"); + } + + @Test + public void peek_resets_to_pending_any_task_in_progress_for_specified_worker_uuid_and_peeks_reset_tasks_if_is_the_oldest_non_worn_out() { + insertPending("u1", WORKER_UUID_1, 3); // won't be picked because worn out + insertInProgress("u2", WORKER_UUID_1, 3); // will be reset but won't be picked because worn out + insertInProgress("u3", WORKER_UUID_1, 1); // will be reset and picked + insertPending("u4", WORKER_UUID_1, 0); // will be picked second + + Optional<CeTask> ceTask = underTest.peek(WORKER_UUID_1); + assertThat(ceTask.get().getUuid()).isEqualTo("u3"); + + // remove first task and do another peek: will pick the reset task since it's now the oldest one + underTest.remove(ceTask.get(), CeActivityDto.Status.SUCCESS, null, null); + assertThat(underTest.peek(WORKER_UUID_1).get().getUuid()).isEqualTo("u4"); + } + + private void verifyResetTask(CeQueueDto originalDto) { + CeQueueDto dto = dbTester.getDbClient().ceQueueDao().selectByUuid(session, originalDto.getUuid()).get(); + assertThat(dto.getStatus()).isEqualTo(CeQueueDto.Status.PENDING); + assertThat(dto.getExecutionCount()).isEqualTo(originalDto.getExecutionCount()); + assertThat(dto.getCreatedAt()).isEqualTo(originalDto.getCreatedAt()); + assertThat(dto.getUpdatedAt()).isGreaterThan(originalDto.getUpdatedAt()); + } + + private void verifyUnmodifiedTask(CeQueueDto originalDto) { + CeQueueDto dto = dbTester.getDbClient().ceQueueDao().selectByUuid(session, originalDto.getUuid()).get(); + assertThat(dto.getStatus()).isEqualTo(originalDto.getStatus()); + assertThat(dto.getExecutionCount()).isEqualTo(originalDto.getExecutionCount()); + assertThat(dto.getCreatedAt()).isEqualTo(originalDto.getCreatedAt()); + assertThat(dto.getUpdatedAt()).isEqualTo(originalDto.getUpdatedAt()); + } + + private CeQueueDto insertInProgress(String uuid, String workerUuid, int executionCount) { + checkArgument(executionCount > 0, "execution count less than 1 does not make sense for an in progress task"); + CeQueueDto dto = new CeQueueDto() + .setUuid(uuid) + .setTaskType("foo") + .setStatus(CeQueueDto.Status.IN_PROGRESS) + .setWorkerUuid(workerUuid) + .setExecutionCount(executionCount); + dbTester.getDbClient().ceQueueDao().insert(session, dto); + dbTester.commit(); + return dto; + } + + private CeQueueDto insertPending(String uuid, String workerUuid, int executionCount) { + checkArgument(executionCount > -1, "execution count less than 0 does not make sense for a pending task"); + CeQueueDto dto = new CeQueueDto() + .setUuid(uuid) + .setTaskType("foo") + .setStatus(CeQueueDto.Status.PENDING) + .setWorkerUuid(workerUuid) + .setExecutionCount(executionCount); + dbTester.getDbClient().ceQueueDao().insert(session, dto); + dbTester.commit(); + return dto; + } + + @Test public void cancel_pending() throws Exception { CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1"); @@ -503,10 +608,11 @@ public class InternalCeQueueImplTest { private void verifyCeQueueDtoForTaskSubmit(CeTaskSubmit taskSubmit) { Optional<CeQueueDto> queueDto = dbTester.getDbClient().ceQueueDao().selectByUuid(dbTester.getSession(), taskSubmit.getUuid()); assertThat(queueDto.isPresent()).isTrue(); - assertThat(queueDto.get().getTaskType()).isEqualTo(taskSubmit.getType()); - assertThat(queueDto.get().getComponentUuid()).isEqualTo(taskSubmit.getComponentUuid()); - assertThat(queueDto.get().getSubmitterLogin()).isEqualTo(taskSubmit.getSubmitterLogin()); - assertThat(queueDto.get().getCreatedAt()).isEqualTo(1_450_000_000_000L); + CeQueueDto dto = queueDto.get(); + assertThat(dto.getTaskType()).isEqualTo(taskSubmit.getType()); + assertThat(dto.getComponentUuid()).isEqualTo(taskSubmit.getComponentUuid()); + assertThat(dto.getSubmitterLogin()).isEqualTo(taskSubmit.getSubmitterLogin()); + assertThat(dto.getCreatedAt()).isEqualTo(dto.getUpdatedAt()).isNotNull(); } private ComponentDto newComponentDto(String uuid) { diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerCallableImplTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerCallableImplTest.java index 05c6112ff57..4fc5aee88b4 100644 --- a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerCallableImplTest.java +++ b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerCallableImplTest.java @@ -24,6 +24,7 @@ import java.util.Optional; import javax.annotation.Nullable; import org.junit.Rule; import org.junit.Test; +import org.mockito.ArgumentCaptor; import org.mockito.InOrder; import org.mockito.Mockito; import org.sonar.api.utils.log.LogTester; @@ -36,16 +37,16 @@ import org.sonar.db.ce.CeTaskTypes; import org.sonar.server.computation.task.projectanalysis.taskprocessor.ReportTaskProcessor; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; public class CeWorkerCallableImplTest { - private static final String UNKNOWN_WORKER_UUID = "UNKNOWN"; - @Rule public CeTaskProcessorRepositoryRule taskProcessorRepository = new CeTaskProcessorRepositoryRule(); @Rule @@ -54,12 +55,13 @@ public class CeWorkerCallableImplTest { private InternalCeQueue queue = mock(InternalCeQueue.class); private ReportTaskProcessor taskProcessor = mock(ReportTaskProcessor.class); private CeLogging ceLogging = spy(CeLogging.class); + private ArgumentCaptor<String> workerUuid = ArgumentCaptor.forClass(String.class); private CeWorkerCallable underTest = new CeWorkerCallableImpl(queue, ceLogging, taskProcessorRepository); private InOrder inOrder = Mockito.inOrder(ceLogging, taskProcessor, queue); @Test public void no_pending_tasks_in_queue() throws Exception { - when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.empty()); + when(queue.peek(anyString())).thenReturn(Optional.empty()); assertThat(underTest.call()).isFalse(); @@ -70,10 +72,11 @@ public class CeWorkerCallableImplTest { public void fail_when_no_CeTaskProcessor_is_found_in_repository() throws Exception { CeTask task = createCeTask(null); taskProcessorRepository.setNoProcessorForTask(CeTaskTypes.REPORT); - when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(task)); + when(queue.peek(anyString())).thenReturn(Optional.of(task)); assertThat(underTest.call()).isTrue(); + verifyWorkerUuid(); inOrder.verify(ceLogging).initForTask(task); inOrder.verify(queue).remove(task, CeActivityDto.Status.FAILED, null, null); inOrder.verify(ceLogging).clearForTask(); @@ -83,10 +86,11 @@ public class CeWorkerCallableImplTest { public void peek_and_process_task() throws Exception { CeTask task = createCeTask(null); taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor); - when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(task)); + when(queue.peek(anyString())).thenReturn(Optional.of(task)); assertThat(underTest.call()).isTrue(); + verifyWorkerUuid(); inOrder.verify(ceLogging).initForTask(task); inOrder.verify(taskProcessor).process(task); inOrder.verify(queue).remove(task, CeActivityDto.Status.SUCCESS, null, null); @@ -96,12 +100,13 @@ public class CeWorkerCallableImplTest { @Test public void fail_to_process_task() throws Exception { CeTask task = createCeTask(null); - when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(task)); + when(queue.peek(anyString())).thenReturn(Optional.of(task)); taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor); Throwable error = makeTaskProcessorFail(task); assertThat(underTest.call()).isTrue(); + verifyWorkerUuid(); inOrder.verify(ceLogging).initForTask(task); inOrder.verify(taskProcessor).process(task); inOrder.verify(queue).remove(task, CeActivityDto.Status.FAILED, null, error); @@ -110,11 +115,12 @@ public class CeWorkerCallableImplTest { @Test public void do_not_display_submitter_param_in_log_when_submitterLogin_is_not_set_in_case_of_success() throws Exception { - when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(createCeTask(null))); + when(queue.peek(anyString())).thenReturn(Optional.of(createCeTask(null))); taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor); underTest.call(); + verifyWorkerUuid(); List<String> logs = logTester.logs(LoggerLevel.INFO); assertThat(logs).hasSize(2); for (int i = 0; i < 2; i++) { @@ -125,12 +131,13 @@ public class CeWorkerCallableImplTest { @Test public void do_not_display_submitter_param_in_log_when_submitterLogin_is_not_set_in_case_of_error() throws Exception { CeTask ceTask = createCeTask(null); - when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(ceTask)); + when(queue.peek(anyString())).thenReturn(Optional.of(ceTask)); taskProcessorRepository.setProcessorForTask(ceTask.getType(), taskProcessor); makeTaskProcessorFail(ceTask); underTest.call(); + verifyWorkerUuid(); List<String> logs = logTester.logs(LoggerLevel.INFO); assertThat(logs).hasSize(1); assertThat(logs.get(0)).doesNotContain(" | submitter="); @@ -144,11 +151,12 @@ public class CeWorkerCallableImplTest { @Test public void display_submitterLogin_in_logs_when_set_in_case_of_success() throws Exception { - when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(createCeTask("FooBar"))); + when(queue.peek(anyString())).thenReturn(Optional.of(createCeTask("FooBar"))); taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor); underTest.call(); + verifyWorkerUuid(); List<String> logs = logTester.logs(LoggerLevel.INFO); assertThat(logs).hasSize(2); assertThat(logs.get(0)).contains(" | submitter=FooBar"); @@ -160,12 +168,13 @@ public class CeWorkerCallableImplTest { @Test public void display_submitterLogin_in_logs_when_set_in_case_of_error() throws Exception { CeTask ceTask = createCeTask("FooBar"); - when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(ceTask)); + when(queue.peek(anyString())).thenReturn(Optional.of(ceTask)); taskProcessorRepository.setProcessorForTask(ceTask.getType(), taskProcessor); makeTaskProcessorFail(ceTask); underTest.call(); + verifyWorkerUuid(); List<String> logs = logTester.logs(LoggerLevel.INFO); assertThat(logs).hasSize(1); assertThat(logs.iterator().next()).contains(" | submitter=FooBar"); @@ -179,11 +188,12 @@ public class CeWorkerCallableImplTest { public void display_start_stop_at_debug_level_for_console_if_DEBUG_is_enabled_and_task_successful() throws Exception { logTester.setLevel(LoggerLevel.DEBUG); - when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(createCeTask("FooBar"))); + when(queue.peek(anyString())).thenReturn(Optional.of(createCeTask("FooBar"))); taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor); underTest.call(); + verifyWorkerUuid(); List<String> logs = logTester.logs(LoggerLevel.INFO); assertThat(logs).hasSize(2); assertThat(logs.get(0)).contains(" | submitter=FooBar"); @@ -197,12 +207,13 @@ public class CeWorkerCallableImplTest { logTester.setLevel(LoggerLevel.DEBUG); CeTask ceTask = createCeTask("FooBar"); - when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(ceTask)); + when(queue.peek(anyString())).thenReturn(Optional.of(ceTask)); taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor); makeTaskProcessorFail(ceTask); underTest.call(); + verifyWorkerUuid(); List<String> logs = logTester.logs(LoggerLevel.INFO); assertThat(logs).hasSize(1); assertThat(logs.iterator().next()).contains(" | submitter=FooBar"); @@ -213,6 +224,11 @@ public class CeWorkerCallableImplTest { assertThat(logTester.logs(LoggerLevel.DEBUG)).isEmpty(); } + private void verifyWorkerUuid() { + verify(queue).peek(workerUuid.capture()); + assertThat(workerUuid.getValue()).startsWith("uuid"); + } + private static CeTask createCeTask(@Nullable String submitterLogin) { return new CeTask.Builder() .setOrganizationUuid("org1") |