From 2349795115d14542a44dba354a5c36bed15742c5 Mon Sep 17 00:00:00 2001 From: =?utf8?q?S=C3=A9bastien=20Lesaint?= Date: Fri, 7 Jul 2017 16:25:22 +0200 Subject: [PATCH] SONAR-9525 CeWorker can be disabled by EnabledCeWorkerController in which case they won't attempt to retrieve a task to process and return immediatly --- .../taskprocessor/CeTaskProcessorModule.java | 1 + .../org/sonar/ce/taskprocessor/CeWorker.java | 2 + .../ce/taskprocessor/CeWorkerFactoryImpl.java | 9 ++- .../sonar/ce/taskprocessor/CeWorkerImpl.java | 9 ++- .../EnabledCeWorkerController.java | 31 ++++++++ .../EnabledCeWorkerControllerImpl.java | 42 ++++++++++ .../ComputeEngineContainerImplTest.java | 2 +- .../CeWorkerFactoryImplTest.java | 4 +- .../ce/taskprocessor/CeWorkerImplTest.java | 40 +++++++++- .../EnabledCeWorkerControllerImplTest.java | 78 +++++++++++++++++++ 10 files changed, 207 insertions(+), 11 deletions(-) create mode 100644 server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/EnabledCeWorkerController.java create mode 100644 server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/EnabledCeWorkerControllerImpl.java create mode 100644 server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/EnabledCeWorkerControllerImplTest.java diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeTaskProcessorModule.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeTaskProcessorModule.java index 9163507abe9..7b07618fd7b 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeTaskProcessorModule.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeTaskProcessorModule.java @@ -27,6 +27,7 @@ public class CeTaskProcessorModule extends Module { add( CeTaskProcessorRepositoryImpl.class, CeWorkerFactoryImpl.class, + EnabledCeWorkerControllerImpl.class, CeProcessingSchedulerExecutorServiceImpl.class, CeProcessingSchedulerImpl.class); } diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorker.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorker.java index 9a7ffff9776..9db5a69ef2c 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorker.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorker.java @@ -30,6 +30,8 @@ import org.sonar.ce.queue.CeTask; */ public interface CeWorker extends Callable { enum Result { + /** Worker is disabled */ + DISABLED, /** Worker found no task to process */ NO_TASK, /** Worker found a task and processed it (either successfully or not) */ diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerFactoryImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerFactoryImpl.java index 21228bbc4cc..ed8e11cd3c5 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerFactoryImpl.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerFactoryImpl.java @@ -34,19 +34,22 @@ public class CeWorkerFactoryImpl implements CeWorkerFactory { private final InternalCeQueue queue; private final CeLogging ceLogging; private final CeTaskProcessorRepository taskProcessorRepository; + private final EnabledCeWorkerController enabledCeWorkerController; - public CeWorkerFactoryImpl(InternalCeQueue queue, CeLogging ceLogging, CeTaskProcessorRepository taskProcessorRepository, UuidFactory uuidFactory) { + public CeWorkerFactoryImpl(InternalCeQueue queue, CeLogging ceLogging, CeTaskProcessorRepository taskProcessorRepository, + UuidFactory uuidFactory, EnabledCeWorkerController enabledCeWorkerController) { this.queue = queue; this.ceLogging = ceLogging; this.taskProcessorRepository = taskProcessorRepository; - this.uuidFactory= uuidFactory; + this.uuidFactory = uuidFactory; + this.enabledCeWorkerController = enabledCeWorkerController; } @Override public CeWorker create(int ordinal) { String uuid = uuidFactory.create(); ceWorkerUUIDs.add(uuid); - return new CeWorkerImpl(ordinal, uuid, queue, ceLogging, taskProcessorRepository); + return new CeWorkerImpl(ordinal, uuid, queue, ceLogging, taskProcessorRepository, enabledCeWorkerController); } @Override diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerImpl.java index ba5e447ac7f..cbcabd72a84 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerImpl.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerImpl.java @@ -33,6 +33,7 @@ import org.sonar.db.ce.CeActivityDto; import static com.google.common.base.Preconditions.checkArgument; import static java.lang.String.format; +import static org.sonar.ce.taskprocessor.CeWorker.Result.DISABLED; import static org.sonar.ce.taskprocessor.CeWorker.Result.NO_TASK; import static org.sonar.ce.taskprocessor.CeWorker.Result.TASK_PROCESSED; @@ -45,14 +46,17 @@ public class CeWorkerImpl implements CeWorker { private final InternalCeQueue queue; private final CeLogging ceLogging; private final CeTaskProcessorRepository taskProcessorRepository; + private final EnabledCeWorkerController enabledCeWorkerController; public CeWorkerImpl(int ordinal, String uuid, - InternalCeQueue queue, CeLogging ceLogging, CeTaskProcessorRepository taskProcessorRepository) { + InternalCeQueue queue, CeLogging ceLogging, CeTaskProcessorRepository taskProcessorRepository, + EnabledCeWorkerController enabledCeWorkerController) { this.ordinal = checkOrdinal(ordinal); this.uuid = uuid; this.queue = queue; this.ceLogging = ceLogging; this.taskProcessorRepository = taskProcessorRepository; + this.enabledCeWorkerController = enabledCeWorkerController; } private static int checkOrdinal(int ordinal) { @@ -66,6 +70,9 @@ public class CeWorkerImpl implements CeWorker { } private Result findAndProcessTask() { + if (!enabledCeWorkerController.isEnabled(this)) { + return DISABLED; + } Optional ceTask = tryAndFindTaskToExecute(); if (!ceTask.isPresent()) { return NO_TASK; diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/EnabledCeWorkerController.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/EnabledCeWorkerController.java new file mode 100644 index 00000000000..03e2a37f3bf --- /dev/null +++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/EnabledCeWorkerController.java @@ -0,0 +1,31 @@ +/* + * SonarQube + * Copyright (C) 2009-2017 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.ce.taskprocessor; + +/** + * This class is responsible of knowing/deciding which {@link CeWorker} is enabled and should actually try and find a + * task to process. + */ +public interface EnabledCeWorkerController { + /** + * Returns {@code true} if the specified {@link CeWorker} is enabled + */ + boolean isEnabled(CeWorker ceWorker); +} diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/EnabledCeWorkerControllerImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/EnabledCeWorkerControllerImpl.java new file mode 100644 index 00000000000..19ae6998732 --- /dev/null +++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/EnabledCeWorkerControllerImpl.java @@ -0,0 +1,42 @@ +/* + * SonarQube + * Copyright (C) 2009-2017 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.ce.taskprocessor; + +import java.util.concurrent.atomic.AtomicInteger; +import org.sonar.ce.configuration.CeConfiguration; + +public class EnabledCeWorkerControllerImpl implements EnabledCeWorkerController { + private final AtomicInteger workerCount; + + public EnabledCeWorkerControllerImpl(CeConfiguration ceConfiguration) { + this.workerCount = new AtomicInteger(ceConfiguration.getWorkerCount()); + } + + /** + * Returns {@code true} if {@link CeWorker#getOrdinal() worker ordinal} is strictly less than + * {@link CeConfiguration#getWorkerCount()}. + * + * This method does not fail if ordinal is invalid (ie. < 0). + */ + @Override + public boolean isEnabled(CeWorker ceWorker) { + return ceWorker.getOrdinal() < workerCount.get(); + } +} diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/container/ComputeEngineContainerImplTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/container/ComputeEngineContainerImplTest.java index b9eec535920..3f03a007644 100644 --- a/server/sonar-ce/src/test/java/org/sonar/ce/container/ComputeEngineContainerImplTest.java +++ b/server/sonar-ce/src/test/java/org/sonar/ce/container/ComputeEngineContainerImplTest.java @@ -119,7 +119,7 @@ public class ComputeEngineContainerImplTest { + 3 // content of CeHttpModule + 3 // content of CeTaskCommonsModule + 4 // content of ProjectAnalysisTaskModule - + 4 // content of CeTaskProcessorModule + + 5 // content of CeTaskProcessorModule + 3 // CeCleaningModule + its content + 1 // CeDistributedInformation ); diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerFactoryImplTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerFactoryImplTest.java index fbf868392e6..21315bcd923 100644 --- a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerFactoryImplTest.java +++ b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerFactoryImplTest.java @@ -33,9 +33,9 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; public class CeWorkerFactoryImplTest { - private int randomOrdinal = new Random().nextInt(); + private int randomOrdinal = new Random().nextInt(20); private CeWorkerFactoryImpl underTest = new CeWorkerFactoryImpl(mock(InternalCeQueue.class), mock(CeLogging.class), - mock(CeTaskProcessorRepository.class), UuidFactoryImpl.INSTANCE); + mock(CeTaskProcessorRepository.class), UuidFactoryImpl.INSTANCE, mock(EnabledCeWorkerController.class)); @Test public void create_return_CeWorker_object_with_specified_ordinal() { diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerImplTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerImplTest.java index 026c987ecd9..b75cd688776 100644 --- a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerImplTest.java +++ b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerImplTest.java @@ -25,6 +25,7 @@ import java.util.Random; import java.util.UUID; import javax.annotation.Nullable; import org.apache.commons.lang.RandomStringUtils; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -41,13 +42,16 @@ import org.sonar.db.ce.CeTaskTypes; import org.sonar.server.computation.task.projectanalysis.taskprocessor.ReportTaskProcessor; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; +import static org.sonar.ce.taskprocessor.CeWorker.Result.DISABLED; import static org.sonar.ce.taskprocessor.CeWorker.Result.NO_TASK; import static org.sonar.ce.taskprocessor.CeWorker.Result.TASK_PROCESSED; @@ -63,27 +67,43 @@ public class CeWorkerImplTest { private InternalCeQueue queue = mock(InternalCeQueue.class); private ReportTaskProcessor taskProcessor = mock(ReportTaskProcessor.class); private CeLogging ceLogging = spy(CeLogging.class); + private EnabledCeWorkerController enabledCeWorkerController = mock(EnabledCeWorkerController.class); private ArgumentCaptor workerUuidCaptor = ArgumentCaptor.forClass(String.class); - private int randomOrdinal = new Random().nextInt(); + private int randomOrdinal = new Random().nextInt(50); private String workerUuid = UUID.randomUUID().toString(); - private CeWorker underTest = new CeWorkerImpl(randomOrdinal, workerUuid, queue, ceLogging, taskProcessorRepository); + private CeWorker underTest = new CeWorkerImpl(randomOrdinal, workerUuid, queue, ceLogging, taskProcessorRepository, enabledCeWorkerController); private InOrder inOrder = Mockito.inOrder(ceLogging, taskProcessor, queue); + @Before + public void setUp() throws Exception { + when(enabledCeWorkerController.isEnabled(any(CeWorker.class))).thenReturn(true); + } + @Test public void constructor_throws_IAE_if_ordinal_is_less_than_zero() { expectedException.expect(IllegalArgumentException.class); expectedException.expectMessage("Ordinal must be >= 0"); - new CeWorkerImpl(-1 - new Random().nextInt(20), workerUuid, queue, ceLogging, taskProcessorRepository); + new CeWorkerImpl(-1 - new Random().nextInt(20), workerUuid, queue, ceLogging, taskProcessorRepository, enabledCeWorkerController); } @Test public void getUUID_must_return_the_uuid_of_constructor() { String uuid = UUID.randomUUID().toString(); - CeWorker underTest = new CeWorkerImpl(randomOrdinal, uuid, queue, ceLogging, taskProcessorRepository); + CeWorker underTest = new CeWorkerImpl(randomOrdinal, uuid, queue, ceLogging, taskProcessorRepository, enabledCeWorkerController); assertThat(underTest.getUUID()).isEqualTo(uuid); } + @Test + public void worker_disabled() throws Exception { + reset(enabledCeWorkerController); + when(enabledCeWorkerController.isEnabled(underTest)).thenReturn(false); + + assertThat(underTest.call()).isEqualTo(DISABLED); + + verifyZeroInteractions(taskProcessor, ceLogging); + } + @Test public void no_pending_tasks_in_queue() throws Exception { when(queue.peek(anyString())).thenReturn(Optional.empty()); @@ -295,6 +315,18 @@ public class CeWorkerImplTest { newThread.join(); } + @Test + public void call_sets_and_restores_thread_name_with_information_of_worker_when_worker_is_disabled() throws Exception { + reset(enabledCeWorkerController); + when(enabledCeWorkerController.isEnabled(underTest)).thenReturn(false); + + String threadName = RandomStringUtils.randomAlphabetic(3); + Thread newThread = createThreadNameVerifyingThread(threadName); + + newThread.start(); + newThread.join(); + } + private Thread createThreadNameVerifyingThread(String threadName) { return new Thread(() -> { verifyUnchangedThreadName(threadName); diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/EnabledCeWorkerControllerImplTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/EnabledCeWorkerControllerImplTest.java new file mode 100644 index 00000000000..d050ed0b870 --- /dev/null +++ b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/EnabledCeWorkerControllerImplTest.java @@ -0,0 +1,78 @@ +/* + * SonarQube + * Copyright (C) 2009-2017 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.ce.taskprocessor; + +import java.util.Random; +import org.junit.Rule; +import org.junit.Test; +import org.sonar.ce.configuration.CeConfigurationRule; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class EnabledCeWorkerControllerImplTest { + private Random random = new Random(); + /** 1 <= workerCount <= 5 */ + private int workerCount = 1 + random.nextInt(5); + + @Rule + public CeConfigurationRule ceConfigurationRule = new CeConfigurationRule() + .setWorkerCount(workerCount); + + private CeWorker ceWorker = mock(CeWorker.class); + private EnabledCeWorkerControllerImpl underTest = new EnabledCeWorkerControllerImpl(ceConfigurationRule); + + @Test + public void isEnabled_returns_true_if_worker_ordinal_is_less_than_CeConfiguration_workerCount() { + int ordinal = workerCount + Math.min(-1, -random.nextInt(workerCount)); + when(ceWorker.getOrdinal()).thenReturn(ordinal); + + assertThat(underTest.isEnabled(ceWorker)) + .as("For ordinal " + ordinal + " and workerCount " + workerCount) + .isTrue(); + } + + @Test + public void isEnabled_returns_false_if_worker_ordinal_is_equal_to_CeConfiguration_workerCount() { + when(ceWorker.getOrdinal()).thenReturn(workerCount); + + assertThat(underTest.isEnabled(ceWorker)).isFalse(); + } + + @Test + public void isEnabled_returns_true_if_ordinal_is_invalid() { + int ordinal = -1 - random.nextInt(3); + when(ceWorker.getOrdinal()).thenReturn(ordinal); + + assertThat(underTest.isEnabled(ceWorker)) + .as("For invalid ordinal " + ordinal + " and workerCount " + workerCount) + .isTrue(); + } + + @Test + public void workerCount_is_loaded_in_constructor() { + when(ceWorker.getOrdinal()).thenReturn(workerCount); + assertThat(underTest.isEnabled(ceWorker)).isFalse(); + + ceConfigurationRule.setWorkerCount(workerCount + 1); + assertThat(underTest.isEnabled(ceWorker)).isFalse(); + } +} -- 2.39.5