aboutsummaryrefslogtreecommitdiffstats
path: root/server/sonar-ce
diff options
context:
space:
mode:
authorSébastien Lesaint <sebastien.lesaint@sonarsource.com>2017-03-29 15:16:28 +0200
committerEric Hartmann <hartmann.eric@gmail.Com>2017-04-27 09:23:18 +0200
commitd463c9d9db163c0dca93a8ac720b50d5251fd5f9 (patch)
tree6993e6cce5b3722b4e795ffb6955b4f70c8022ff /server/sonar-ce
parent5c659e207f045706c4408cc9ae72c56a71e545d9 (diff)
downloadsonarqube-d463c9d9db163c0dca93a8ac720b50d5251fd5f9.tar.gz
sonarqube-d463c9d9db163c0dca93a8ac720b50d5251fd5f9.zip
SONAR-8987 worker reset any in progress task it has when peeking
Diffstat (limited to 'server/sonar-ce')
-rw-r--r--server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java10
-rw-r--r--server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerCallableImpl.java4
-rw-r--r--server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java118
-rw-r--r--server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerCallableImplTest.java40
4 files changed, 152 insertions, 20 deletions
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java
index 8f899d4b2e6..8e5bc19bf8f 100644
--- a/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java
+++ b/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java
@@ -31,11 +31,13 @@ import javax.annotation.Nullable;
import org.apache.log4j.Logger;
import org.sonar.api.ce.ComputeEngineSide;
import org.sonar.api.utils.System2;
+import org.sonar.api.utils.log.Loggers;
import org.sonar.ce.monitoring.CEQueueStatus;
import org.sonar.core.util.UuidFactory;
import org.sonar.db.DbClient;
import org.sonar.db.DbSession;
import org.sonar.db.ce.CeActivityDto;
+import org.sonar.db.ce.CeQueueDao;
import org.sonar.db.ce.CeQueueDto;
import org.sonar.server.organization.DefaultOrganizationProvider;
@@ -45,6 +47,7 @@ import static java.util.Objects.requireNonNull;
@ComputeEngineSide
public class InternalCeQueueImpl extends CeQueueImpl implements InternalCeQueue {
+ private static final org.sonar.api.utils.log.Logger LOG = Loggers.get(InternalCeQueueImpl.class);
private static final int MAX_EXECUTION_COUNT = 2;
@@ -71,7 +74,12 @@ public class InternalCeQueueImpl extends CeQueueImpl implements InternalCeQueue
return Optional.empty();
}
try (DbSession dbSession = dbClient.openSession(false)) {
- Optional<CeQueueDto> dto = dbClient.ceQueueDao().peek(dbSession, workerUuid, MAX_EXECUTION_COUNT);
+ CeQueueDao ceQueueDao = dbClient.ceQueueDao();
+ int i = ceQueueDao.resetToPendingForWorker(dbSession, workerUuid);
+ if (i > 0) {
+ LOG.debug("{} in progress tasks reset for worker uuid {}", i, workerUuid);
+ }
+ Optional<CeQueueDto> dto = ceQueueDao.peek(dbSession, workerUuid, MAX_EXECUTION_COUNT);
CeTask task = null;
if (dto.isPresent()) {
task = loadTask(dbSession, dto.get());
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerCallableImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerCallableImpl.java
index 517a0a7651d..eb6d9d2ee24 100644
--- a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerCallableImpl.java
+++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerCallableImpl.java
@@ -20,6 +20,7 @@
package org.sonar.ce.taskprocessor;
import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import org.sonar.api.utils.log.Logger;
import org.sonar.api.utils.log.Loggers;
@@ -61,9 +62,10 @@ public class CeWorkerCallableImpl implements CeWorkerCallable {
return true;
}
+ private static final AtomicLong counter = new AtomicLong(0);
private Optional<CeTask> tryAndFindTaskToExecute() {
try {
- return queue.peek("UNKNOWN" /*FIXME provide a real worker uuid*/);
+ return queue.peek("uuid" + counter.addAndGet(100));
} catch (Exception e) {
LOG.error("Failed to pop the queue of analysis reports", e);
}
diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java
index 071bd177845..93340a39758 100644
--- a/server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java
+++ b/server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java
@@ -31,7 +31,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.sonar.api.utils.System2;
-import org.sonar.api.utils.internal.TestSystem2;
+import org.sonar.api.utils.internal.AlwaysIncreasingSystem2;
import org.sonar.ce.monitoring.CEQueueStatus;
import org.sonar.ce.monitoring.CEQueueStatusImpl;
import org.sonar.core.util.UuidFactory;
@@ -47,6 +47,7 @@ import org.sonar.db.organization.OrganizationDto;
import org.sonar.server.organization.DefaultOrganization;
import org.sonar.server.organization.DefaultOrganizationProvider;
+import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Arrays.asList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
@@ -58,7 +59,7 @@ public class InternalCeQueueImplTest {
private static final String WORKER_UUID_1 = "worker uuid 1";
private static final String WORKER_UUID_2 = "worker uuid 2";
- private System2 system2 = new TestSystem2().setNow(1_450_000_000_000L);
+ private System2 system2 = new AlwaysIncreasingSystem2();
@Rule
public ExpectedException expectedException = ExpectedException.none();
@@ -352,6 +353,110 @@ public class InternalCeQueueImplTest {
}
@Test
+ public void peek_resets_to_pending_any_task_in_progress_for_specified_worker_uuid_and_updates_updatedAt_no_matter_execution_count() {
+ insertPending("u0", "doesn't matter", 0); // add a pending one that will be picked so that u1 isn't peek and status reset is visible in DB
+ CeQueueDto u1 = insertPending("u1", WORKER_UUID_1, 2);// won't be peeked because it's worn-out
+ CeQueueDto u2 = insertInProgress("u2", WORKER_UUID_1, 3);// will be reset but won't be picked because it's worn-out
+ CeQueueDto u3 = insertPending("u3", WORKER_UUID_1, 1);// will be picked-because older than any of the reset ones
+ CeQueueDto u4 = insertInProgress("u4", WORKER_UUID_1, 1);// will be reset
+
+ assertThat(underTest.peek(WORKER_UUID_1).get().getUuid()).isEqualTo("u0");
+
+ verifyUnmodifiedTask(u1);
+ verifyResetTask(u2);
+ verifyUnmodifiedTask(u3);
+ verifyResetTask(u4);
+ }
+
+ @Test
+ public void peek_resets_to_pending_any_task_in_progress_for_specified_worker_uuid_and_only_this_uuid() {
+ insertPending("u0", "doesn't matter", 0); // add a pending one that will be picked so that u1 isn't peek and status reset is visible in DB
+ CeQueueDto u1 = insertInProgress("u1", WORKER_UUID_1, 3);
+ CeQueueDto u2 = insertInProgress("u2", WORKER_UUID_2, 3);
+ CeQueueDto u3 = insertInProgress("u3", WORKER_UUID_1, 3);
+ CeQueueDto u4 = insertInProgress("u4", WORKER_UUID_2, 1);
+
+ assertThat(underTest.peek(WORKER_UUID_1).get().getUuid()).isEqualTo("u0");
+
+ verifyResetTask(u1);
+ verifyUnmodifiedTask(u2);
+ verifyResetTask(u3);
+ verifyUnmodifiedTask(u4);
+ }
+
+ @Test
+ public void peek_resets_to_pending_any_task_in_progress_for_specified_worker_uuid_and_peeks_the_oldest_non_worn_out_no_matter_if_it_has_been_reset_or_not() {
+ insertPending("u1", WORKER_UUID_1, 3); // won't be picked because worn out
+ insertInProgress("u2", WORKER_UUID_1, 3); // will be reset but won't be picked because worn out
+ insertPending("u3", WORKER_UUID_1, 0); // will be picked first
+ insertInProgress("u4", WORKER_UUID_1, 1); // will be reset and picked on second call only
+
+ Optional<CeTask> ceTask = underTest.peek(WORKER_UUID_1);
+ assertThat(ceTask.get().getUuid()).isEqualTo("u3");
+
+ // remove first task and do another peek: will pick the reset task since it's now the oldest one
+ underTest.remove(ceTask.get(), CeActivityDto.Status.SUCCESS, null, null);
+ assertThat(underTest.peek(WORKER_UUID_1).get().getUuid()).isEqualTo("u4");
+ }
+
+ @Test
+ public void peek_resets_to_pending_any_task_in_progress_for_specified_worker_uuid_and_peeks_reset_tasks_if_is_the_oldest_non_worn_out() {
+ insertPending("u1", WORKER_UUID_1, 3); // won't be picked because worn out
+ insertInProgress("u2", WORKER_UUID_1, 3); // will be reset but won't be picked because worn out
+ insertInProgress("u3", WORKER_UUID_1, 1); // will be reset and picked
+ insertPending("u4", WORKER_UUID_1, 0); // will be picked second
+
+ Optional<CeTask> ceTask = underTest.peek(WORKER_UUID_1);
+ assertThat(ceTask.get().getUuid()).isEqualTo("u3");
+
+ // remove first task and do another peek: will pick the reset task since it's now the oldest one
+ underTest.remove(ceTask.get(), CeActivityDto.Status.SUCCESS, null, null);
+ assertThat(underTest.peek(WORKER_UUID_1).get().getUuid()).isEqualTo("u4");
+ }
+
+ private void verifyResetTask(CeQueueDto originalDto) {
+ CeQueueDto dto = dbTester.getDbClient().ceQueueDao().selectByUuid(session, originalDto.getUuid()).get();
+ assertThat(dto.getStatus()).isEqualTo(CeQueueDto.Status.PENDING);
+ assertThat(dto.getExecutionCount()).isEqualTo(originalDto.getExecutionCount());
+ assertThat(dto.getCreatedAt()).isEqualTo(originalDto.getCreatedAt());
+ assertThat(dto.getUpdatedAt()).isGreaterThan(originalDto.getUpdatedAt());
+ }
+
+ private void verifyUnmodifiedTask(CeQueueDto originalDto) {
+ CeQueueDto dto = dbTester.getDbClient().ceQueueDao().selectByUuid(session, originalDto.getUuid()).get();
+ assertThat(dto.getStatus()).isEqualTo(originalDto.getStatus());
+ assertThat(dto.getExecutionCount()).isEqualTo(originalDto.getExecutionCount());
+ assertThat(dto.getCreatedAt()).isEqualTo(originalDto.getCreatedAt());
+ assertThat(dto.getUpdatedAt()).isEqualTo(originalDto.getUpdatedAt());
+ }
+
+ private CeQueueDto insertInProgress(String uuid, String workerUuid, int executionCount) {
+ checkArgument(executionCount > 0, "execution count less than 1 does not make sense for an in progress task");
+ CeQueueDto dto = new CeQueueDto()
+ .setUuid(uuid)
+ .setTaskType("foo")
+ .setStatus(CeQueueDto.Status.IN_PROGRESS)
+ .setWorkerUuid(workerUuid)
+ .setExecutionCount(executionCount);
+ dbTester.getDbClient().ceQueueDao().insert(session, dto);
+ dbTester.commit();
+ return dto;
+ }
+
+ private CeQueueDto insertPending(String uuid, String workerUuid, int executionCount) {
+ checkArgument(executionCount > -1, "execution count less than 0 does not make sense for a pending task");
+ CeQueueDto dto = new CeQueueDto()
+ .setUuid(uuid)
+ .setTaskType("foo")
+ .setStatus(CeQueueDto.Status.PENDING)
+ .setWorkerUuid(workerUuid)
+ .setExecutionCount(executionCount);
+ dbTester.getDbClient().ceQueueDao().insert(session, dto);
+ dbTester.commit();
+ return dto;
+ }
+
+ @Test
public void cancel_pending() throws Exception {
CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1");
@@ -503,10 +608,11 @@ public class InternalCeQueueImplTest {
private void verifyCeQueueDtoForTaskSubmit(CeTaskSubmit taskSubmit) {
Optional<CeQueueDto> queueDto = dbTester.getDbClient().ceQueueDao().selectByUuid(dbTester.getSession(), taskSubmit.getUuid());
assertThat(queueDto.isPresent()).isTrue();
- assertThat(queueDto.get().getTaskType()).isEqualTo(taskSubmit.getType());
- assertThat(queueDto.get().getComponentUuid()).isEqualTo(taskSubmit.getComponentUuid());
- assertThat(queueDto.get().getSubmitterLogin()).isEqualTo(taskSubmit.getSubmitterLogin());
- assertThat(queueDto.get().getCreatedAt()).isEqualTo(1_450_000_000_000L);
+ CeQueueDto dto = queueDto.get();
+ assertThat(dto.getTaskType()).isEqualTo(taskSubmit.getType());
+ assertThat(dto.getComponentUuid()).isEqualTo(taskSubmit.getComponentUuid());
+ assertThat(dto.getSubmitterLogin()).isEqualTo(taskSubmit.getSubmitterLogin());
+ assertThat(dto.getCreatedAt()).isEqualTo(dto.getUpdatedAt()).isNotNull();
}
private ComponentDto newComponentDto(String uuid) {
diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerCallableImplTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerCallableImplTest.java
index 05c6112ff57..4fc5aee88b4 100644
--- a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerCallableImplTest.java
+++ b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerCallableImplTest.java
@@ -24,6 +24,7 @@ import java.util.Optional;
import javax.annotation.Nullable;
import org.junit.Rule;
import org.junit.Test;
+import org.mockito.ArgumentCaptor;
import org.mockito.InOrder;
import org.mockito.Mockito;
import org.sonar.api.utils.log.LogTester;
@@ -36,16 +37,16 @@ import org.sonar.db.ce.CeTaskTypes;
import org.sonar.server.computation.task.projectanalysis.taskprocessor.ReportTaskProcessor;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
public class CeWorkerCallableImplTest {
- private static final String UNKNOWN_WORKER_UUID = "UNKNOWN";
-
@Rule
public CeTaskProcessorRepositoryRule taskProcessorRepository = new CeTaskProcessorRepositoryRule();
@Rule
@@ -54,12 +55,13 @@ public class CeWorkerCallableImplTest {
private InternalCeQueue queue = mock(InternalCeQueue.class);
private ReportTaskProcessor taskProcessor = mock(ReportTaskProcessor.class);
private CeLogging ceLogging = spy(CeLogging.class);
+ private ArgumentCaptor<String> workerUuid = ArgumentCaptor.forClass(String.class);
private CeWorkerCallable underTest = new CeWorkerCallableImpl(queue, ceLogging, taskProcessorRepository);
private InOrder inOrder = Mockito.inOrder(ceLogging, taskProcessor, queue);
@Test
public void no_pending_tasks_in_queue() throws Exception {
- when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.empty());
+ when(queue.peek(anyString())).thenReturn(Optional.empty());
assertThat(underTest.call()).isFalse();
@@ -70,10 +72,11 @@ public class CeWorkerCallableImplTest {
public void fail_when_no_CeTaskProcessor_is_found_in_repository() throws Exception {
CeTask task = createCeTask(null);
taskProcessorRepository.setNoProcessorForTask(CeTaskTypes.REPORT);
- when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(task));
+ when(queue.peek(anyString())).thenReturn(Optional.of(task));
assertThat(underTest.call()).isTrue();
+ verifyWorkerUuid();
inOrder.verify(ceLogging).initForTask(task);
inOrder.verify(queue).remove(task, CeActivityDto.Status.FAILED, null, null);
inOrder.verify(ceLogging).clearForTask();
@@ -83,10 +86,11 @@ public class CeWorkerCallableImplTest {
public void peek_and_process_task() throws Exception {
CeTask task = createCeTask(null);
taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor);
- when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(task));
+ when(queue.peek(anyString())).thenReturn(Optional.of(task));
assertThat(underTest.call()).isTrue();
+ verifyWorkerUuid();
inOrder.verify(ceLogging).initForTask(task);
inOrder.verify(taskProcessor).process(task);
inOrder.verify(queue).remove(task, CeActivityDto.Status.SUCCESS, null, null);
@@ -96,12 +100,13 @@ public class CeWorkerCallableImplTest {
@Test
public void fail_to_process_task() throws Exception {
CeTask task = createCeTask(null);
- when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(task));
+ when(queue.peek(anyString())).thenReturn(Optional.of(task));
taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor);
Throwable error = makeTaskProcessorFail(task);
assertThat(underTest.call()).isTrue();
+ verifyWorkerUuid();
inOrder.verify(ceLogging).initForTask(task);
inOrder.verify(taskProcessor).process(task);
inOrder.verify(queue).remove(task, CeActivityDto.Status.FAILED, null, error);
@@ -110,11 +115,12 @@ public class CeWorkerCallableImplTest {
@Test
public void do_not_display_submitter_param_in_log_when_submitterLogin_is_not_set_in_case_of_success() throws Exception {
- when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(createCeTask(null)));
+ when(queue.peek(anyString())).thenReturn(Optional.of(createCeTask(null)));
taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
underTest.call();
+ verifyWorkerUuid();
List<String> logs = logTester.logs(LoggerLevel.INFO);
assertThat(logs).hasSize(2);
for (int i = 0; i < 2; i++) {
@@ -125,12 +131,13 @@ public class CeWorkerCallableImplTest {
@Test
public void do_not_display_submitter_param_in_log_when_submitterLogin_is_not_set_in_case_of_error() throws Exception {
CeTask ceTask = createCeTask(null);
- when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(ceTask));
+ when(queue.peek(anyString())).thenReturn(Optional.of(ceTask));
taskProcessorRepository.setProcessorForTask(ceTask.getType(), taskProcessor);
makeTaskProcessorFail(ceTask);
underTest.call();
+ verifyWorkerUuid();
List<String> logs = logTester.logs(LoggerLevel.INFO);
assertThat(logs).hasSize(1);
assertThat(logs.get(0)).doesNotContain(" | submitter=");
@@ -144,11 +151,12 @@ public class CeWorkerCallableImplTest {
@Test
public void display_submitterLogin_in_logs_when_set_in_case_of_success() throws Exception {
- when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(createCeTask("FooBar")));
+ when(queue.peek(anyString())).thenReturn(Optional.of(createCeTask("FooBar")));
taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
underTest.call();
+ verifyWorkerUuid();
List<String> logs = logTester.logs(LoggerLevel.INFO);
assertThat(logs).hasSize(2);
assertThat(logs.get(0)).contains(" | submitter=FooBar");
@@ -160,12 +168,13 @@ public class CeWorkerCallableImplTest {
@Test
public void display_submitterLogin_in_logs_when_set_in_case_of_error() throws Exception {
CeTask ceTask = createCeTask("FooBar");
- when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(ceTask));
+ when(queue.peek(anyString())).thenReturn(Optional.of(ceTask));
taskProcessorRepository.setProcessorForTask(ceTask.getType(), taskProcessor);
makeTaskProcessorFail(ceTask);
underTest.call();
+ verifyWorkerUuid();
List<String> logs = logTester.logs(LoggerLevel.INFO);
assertThat(logs).hasSize(1);
assertThat(logs.iterator().next()).contains(" | submitter=FooBar");
@@ -179,11 +188,12 @@ public class CeWorkerCallableImplTest {
public void display_start_stop_at_debug_level_for_console_if_DEBUG_is_enabled_and_task_successful() throws Exception {
logTester.setLevel(LoggerLevel.DEBUG);
- when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(createCeTask("FooBar")));
+ when(queue.peek(anyString())).thenReturn(Optional.of(createCeTask("FooBar")));
taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
underTest.call();
+ verifyWorkerUuid();
List<String> logs = logTester.logs(LoggerLevel.INFO);
assertThat(logs).hasSize(2);
assertThat(logs.get(0)).contains(" | submitter=FooBar");
@@ -197,12 +207,13 @@ public class CeWorkerCallableImplTest {
logTester.setLevel(LoggerLevel.DEBUG);
CeTask ceTask = createCeTask("FooBar");
- when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(ceTask));
+ when(queue.peek(anyString())).thenReturn(Optional.of(ceTask));
taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
makeTaskProcessorFail(ceTask);
underTest.call();
+ verifyWorkerUuid();
List<String> logs = logTester.logs(LoggerLevel.INFO);
assertThat(logs).hasSize(1);
assertThat(logs.iterator().next()).contains(" | submitter=FooBar");
@@ -213,6 +224,11 @@ public class CeWorkerCallableImplTest {
assertThat(logTester.logs(LoggerLevel.DEBUG)).isEmpty();
}
+ private void verifyWorkerUuid() {
+ verify(queue).peek(workerUuid.capture());
+ assertThat(workerUuid.getValue()).startsWith("uuid");
+ }
+
private static CeTask createCeTask(@Nullable String submitterLogin) {
return new CeTask.Builder()
.setOrganizationUuid("org1")