From 36becb8f4ca1a931fe350dc34a5a4fefd0dfd81b Mon Sep 17 00:00:00 2001 From: =?utf8?q?S=C3=A9bastien=20Lesaint?= Date: Tue, 18 Apr 2017 12:35:48 +0200 Subject: [PATCH] SONAR-8985 add unique identifier to CeWorker --- .../ce/configuration/CeConfiguration.java | 4 +- .../CeProcessingSchedulerExecutorService.java | 2 +- .../CeProcessingSchedulerImpl.java | 19 +++-- .../taskprocessor/CeTaskProcessorModule.java | 2 +- .../{CeWorkerCallable.java => CeWorker.java} | 3 +- .../ce/taskprocessor/CeWorkerFactory.java | 39 ++++++++++ .../ce/taskprocessor/CeWorkerFactoryImpl.java | 56 ++++++++++++++ ...kerCallableImpl.java => CeWorkerImpl.java} | 19 +++-- .../CeProcessingSchedulerImplTest.java | 73 ++++++++++++++----- .../CeWorkerFactoryImplTest.java | 73 +++++++++++++++++++ ...bleImplTest.java => CeWorkerImplTest.java} | 14 +++- 11 files changed, 265 insertions(+), 39 deletions(-) rename server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/{CeWorkerCallable.java => CeWorker.java} (94%) create mode 100644 server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerFactory.java create mode 100644 server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerFactoryImpl.java rename server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/{CeWorkerCallableImpl.java => CeWorkerImpl.java} (91%) create mode 100644 server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerFactoryImplTest.java rename server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/{CeWorkerCallableImplTest.java => CeWorkerImplTest.java} (94%) diff --git a/server/sonar-ce-api/src/main/java/org/sonar/ce/configuration/CeConfiguration.java b/server/sonar-ce-api/src/main/java/org/sonar/ce/configuration/CeConfiguration.java index eedc8d4893f..dd27ba3f882 100644 --- a/server/sonar-ce-api/src/main/java/org/sonar/ce/configuration/CeConfiguration.java +++ b/server/sonar-ce-api/src/main/java/org/sonar/ce/configuration/CeConfiguration.java @@ -27,8 +27,8 @@ public interface CeConfiguration { int getWorkerCount(); /** - * The delay in milliseconds before calling another {@link org.sonar.server.computation.taskprocessor.CeWorkerCallable} - * when previous one had nothing to do. + * The delay in millisecond before a {@link CeWorker} shall try and find a task + * to process when it's previous execution had nothing to do. */ long getQueuePollingDelay(); diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerExecutorService.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerExecutorService.java index 2aa14e6962d..75097fd6d0c 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerExecutorService.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerExecutorService.java @@ -23,7 +23,7 @@ import com.google.common.util.concurrent.ListeningScheduledExecutorService; import org.sonar.server.util.StoppableExecutorService; /** - * The {@link java.util.concurrent.ExecutorService} responsible for running {@link CeWorkerCallableImpl}. + * The {@link java.util.concurrent.ExecutorService} responsible for running {@link CeWorkerImpl}. */ public interface CeProcessingSchedulerExecutorService extends StoppableExecutorService, ListeningScheduledExecutorService { } diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImpl.java index 47f56af6ff1..fd145a6815d 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImpl.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImpl.java @@ -39,16 +39,14 @@ public class CeProcessingSchedulerImpl implements CeProcessingScheduler, Startab private static final Logger LOG = Loggers.get(CeProcessingSchedulerImpl.class); private final CeProcessingSchedulerExecutorService executorService; - private final CeWorkerCallable workerRunnable; private final long delayBetweenTasks; private final TimeUnit timeUnit; private final ChainingCallback[] chainingCallbacks; public CeProcessingSchedulerImpl(CeConfiguration ceConfiguration, - CeProcessingSchedulerExecutorService processingExecutorService, CeWorkerCallable workerRunnable) { + CeProcessingSchedulerExecutorService processingExecutorService, CeWorkerFactory ceCeWorkerFactory) { this.executorService = processingExecutorService; - this.workerRunnable = workerRunnable; this.delayBetweenTasks = ceConfiguration.getQueuePollingDelay(); this.timeUnit = MILLISECONDS; @@ -56,7 +54,8 @@ public class CeProcessingSchedulerImpl implements CeProcessingScheduler, Startab int workerCount = ceConfiguration.getWorkerCount(); this.chainingCallbacks = new ChainingCallback[workerCount]; for (int i = 0; i < workerCount; i++) { - chainingCallbacks[i] = new ChainingCallback(); + CeWorker worker = ceCeWorkerFactory.create(); + chainingCallbacks[i] = new ChainingCallback(worker); } } @@ -68,7 +67,7 @@ public class CeProcessingSchedulerImpl implements CeProcessingScheduler, Startab @Override public void startScheduling() { for (ChainingCallback chainingCallback : chainingCallbacks) { - ListenableScheduledFuture future = executorService.schedule(workerRunnable, delayBetweenTasks, timeUnit); + ListenableScheduledFuture future = executorService.schedule(chainingCallback.worker, delayBetweenTasks, timeUnit); addCallback(future, chainingCallback, executorService); } } @@ -82,9 +81,15 @@ public class CeProcessingSchedulerImpl implements CeProcessingScheduler, Startab private class ChainingCallback implements FutureCallback { private final AtomicBoolean keepRunning = new AtomicBoolean(true); + private final CeWorker worker; + @CheckForNull private ListenableFuture workerFuture; + public ChainingCallback(CeWorker worker) { + this.worker = worker; + } + @Override public void onSuccess(@Nullable Boolean result) { if (result != null && result) { @@ -105,14 +110,14 @@ public class CeProcessingSchedulerImpl implements CeProcessingScheduler, Startab private void chainWithoutDelay() { if (keepRunning()) { - workerFuture = executorService.submit(workerRunnable); + workerFuture = executorService.submit(worker); } addCallback(); } private void chainWithDelay() { if (keepRunning()) { - workerFuture = executorService.schedule(workerRunnable, delayBetweenTasks, timeUnit); + workerFuture = executorService.schedule(worker, delayBetweenTasks, timeUnit); } addCallback(); } diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeTaskProcessorModule.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeTaskProcessorModule.java index b6f08854b8c..9163507abe9 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeTaskProcessorModule.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeTaskProcessorModule.java @@ -26,7 +26,7 @@ public class CeTaskProcessorModule extends Module { protected void configureModule() { add( CeTaskProcessorRepositoryImpl.class, - CeWorkerCallableImpl.class, + CeWorkerFactoryImpl.class, CeProcessingSchedulerExecutorServiceImpl.class, CeProcessingSchedulerImpl.class); } diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerCallable.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorker.java similarity index 94% rename from server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerCallable.java rename to server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorker.java index 2618b37b070..7390b4a1d41 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerCallable.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorker.java @@ -28,5 +28,6 @@ import org.sonar.ce.queue.CeTask; * {@link Callable#call()} returns a Boolean which is {@code true} when some a {@link CeTask} was processed, * {@code false} otherwise. */ -public interface CeWorkerCallable extends Callable { +public interface CeWorker extends Callable { + String getUUID(); } diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerFactory.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerFactory.java new file mode 100644 index 00000000000..65d9096da66 --- /dev/null +++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerFactory.java @@ -0,0 +1,39 @@ +/* + * SonarQube + * Copyright (C) 2009-2017 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.ce.taskprocessor; + +import java.util.Set; + +/** + * A factory that will create the CeWorkerFactory with an UUID + */ +public interface CeWorkerFactory { + /** + * Create a new CeWorker object. + * Each {@link CeWorker} returned by this method will have a different UUID from the others and all of these UUIDS will be returned by {@link #getWorkerUUIDs()}. + * + * @return the CeWorker + */ + CeWorker create(); + /** + * @return the UUIDs of each {@link CeWorker} object returned by {@link #create}. + */ + Set getWorkerUUIDs(); +} diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerFactoryImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerFactoryImpl.java new file mode 100644 index 00000000000..e684caa7fe7 --- /dev/null +++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerFactoryImpl.java @@ -0,0 +1,56 @@ +/* + * SonarQube + * Copyright (C) 2009-2017 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package org.sonar.ce.taskprocessor; + +import java.util.HashSet; +import java.util.Set; +import org.sonar.ce.log.CeLogging; +import org.sonar.ce.queue.InternalCeQueue; +import org.sonar.core.util.UuidFactory; + +import static com.google.common.collect.ImmutableSet.copyOf; + +public class CeWorkerFactoryImpl implements CeWorkerFactory { + private final UuidFactory uuidFactory; + private final Set ceWorkerUUIDs = new HashSet<>(); + private final InternalCeQueue queue; + private final CeLogging ceLogging; + private final CeTaskProcessorRepository taskProcessorRepository; + + public CeWorkerFactoryImpl(InternalCeQueue queue, CeLogging ceLogging, CeTaskProcessorRepository taskProcessorRepository, UuidFactory uuidFactory) { + this.queue = queue; + this.ceLogging = ceLogging; + this.taskProcessorRepository = taskProcessorRepository; + this.uuidFactory= uuidFactory; + } + + @Override + public CeWorker create() { + String uuid = uuidFactory.create(); + ceWorkerUUIDs.add(uuid); + return new CeWorkerImpl(queue, ceLogging, taskProcessorRepository, uuid); + } + + @Override + public Set getWorkerUUIDs() { + return copyOf(ceWorkerUUIDs); + } +} diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerCallableImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerImpl.java similarity index 91% rename from server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerCallableImpl.java rename to server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerImpl.java index eb6d9d2ee24..025f6366655 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerCallableImpl.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerImpl.java @@ -20,7 +20,6 @@ package org.sonar.ce.taskprocessor; import java.util.Optional; -import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Nullable; import org.sonar.api.utils.log.Logger; import org.sonar.api.utils.log.Loggers; @@ -33,18 +32,20 @@ import org.sonar.db.ce.CeActivityDto; import static java.lang.String.format; -public class CeWorkerCallableImpl implements CeWorkerCallable { +public class CeWorkerImpl implements CeWorker { - private static final Logger LOG = Loggers.get(CeWorkerCallableImpl.class); + private static final Logger LOG = Loggers.get(CeWorkerImpl.class); private final InternalCeQueue queue; private final CeLogging ceLogging; private final CeTaskProcessorRepository taskProcessorRepository; + private final String uuid; - public CeWorkerCallableImpl(InternalCeQueue queue, CeLogging ceLogging, CeTaskProcessorRepository taskProcessorRepository) { + public CeWorkerImpl(InternalCeQueue queue, CeLogging ceLogging, CeTaskProcessorRepository taskProcessorRepository, String uuid) { this.queue = queue; this.ceLogging = ceLogging; this.taskProcessorRepository = taskProcessorRepository; + this.uuid = uuid; } @Override @@ -62,16 +63,21 @@ public class CeWorkerCallableImpl implements CeWorkerCallable { return true; } - private static final AtomicLong counter = new AtomicLong(0); + private Optional tryAndFindTaskToExecute() { try { - return queue.peek("uuid" + counter.addAndGet(100)); + return queue.peek(uuid); } catch (Exception e) { LOG.error("Failed to pop the queue of analysis reports", e); } return Optional.empty(); } + @Override + public String getUUID() { + return uuid; + } + private void executeTask(CeTask task) { ceLogging.initForTask(task); Profiler ceProfiler = startActivityProfiler(task); @@ -135,5 +141,4 @@ public class CeWorkerCallableImpl implements CeWorkerCallable { profiler.addContext("submitter", submitterLogin); } } - } diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImplTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImplTest.java index d3480186bd2..43585cf63d1 100644 --- a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImplTest.java +++ b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImplTest.java @@ -25,10 +25,12 @@ import com.google.common.util.concurrent.ListeningScheduledExecutorService; import com.google.common.util.concurrent.MoreExecutors; import java.util.ArrayList; import java.util.Collection; +import java.util.Iterator; import java.util.List; import java.util.Objects; import java.util.Queue; import java.util.Random; +import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Delayed; @@ -45,11 +47,14 @@ import org.junit.Test; import org.junit.rules.Timeout; import org.sonar.ce.configuration.CeConfigurationRule; +import static com.google.common.collect.ImmutableList.copyOf; +import static java.util.Collections.emptySet; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -62,17 +67,18 @@ public class CeProcessingSchedulerImplTest { public Timeout timeout = Timeout.seconds(60); @Rule public CeConfigurationRule ceConfiguration = new CeConfigurationRule(); - - private CeWorkerCallable ceWorkerRunnable = mock(CeWorkerCallable.class); + // Required to prevent an infinite loop + private CeWorker ceWorker = mock(CeWorker.class); + private CeWorkerFactory ceWorkerFactory = new TestCeWorkerFactory(ceWorker); private StubCeProcessingSchedulerExecutorService processingExecutorService = new StubCeProcessingSchedulerExecutorService(); - private SchedulerCall regularDelayedPoll = new SchedulerCall(ceWorkerRunnable, 2000L, TimeUnit.MILLISECONDS); - private SchedulerCall notDelayedPoll = new SchedulerCall(ceWorkerRunnable); + private SchedulerCall regularDelayedPoll = new SchedulerCall(ceWorker, 2000L, MILLISECONDS); + private SchedulerCall notDelayedPoll = new SchedulerCall(ceWorker); - private CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(ceConfiguration, processingExecutorService, ceWorkerRunnable); + private CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(ceConfiguration, processingExecutorService, ceWorkerFactory); @Test public void polls_without_delay_when_CeWorkerCallable_returns_true() throws Exception { - when(ceWorkerRunnable.call()) + when(ceWorker.call()) .thenReturn(true) .thenThrow(ERROR_TO_INTERRUPT_CHAINING); @@ -86,7 +92,7 @@ public class CeProcessingSchedulerImplTest { @Test public void polls_without_delay_when_CeWorkerCallable_throws_Exception_but_not_Error() throws Exception { - when(ceWorkerRunnable.call()) + when(ceWorker.call()) .thenThrow(new Exception("Exception is followed by a poll without delay")) .thenThrow(ERROR_TO_INTERRUPT_CHAINING); @@ -100,7 +106,7 @@ public class CeProcessingSchedulerImplTest { @Test public void polls_with_regular_delay_when_CeWorkerCallable_returns_false() throws Exception { - when(ceWorkerRunnable.call()) + when(ceWorker.call()) .thenReturn(false) .thenThrow(ERROR_TO_INTERRUPT_CHAINING); @@ -114,7 +120,7 @@ public class CeProcessingSchedulerImplTest { @Test public void startScheduling_schedules_CeWorkerCallable_at_fixed_rate_run_head_of_queue() throws Exception { - when(ceWorkerRunnable.call()) + when(ceWorker.call()) .thenReturn(true) .thenReturn(true) .thenReturn(false) @@ -144,7 +150,7 @@ public class CeProcessingSchedulerImplTest { @Test public void stop_cancels_next_polling_and_does_not_add_any_new_one() throws Exception { - when(ceWorkerRunnable.call()) + when(ceWorker.call()) .thenReturn(false) .thenReturn(true) .thenReturn(false) @@ -182,21 +188,36 @@ public class CeProcessingSchedulerImplTest { } @Test - public void when_workerCount_is_more_than_1_as_many_CeWorkerCallable_are_scheduled() throws InterruptedException { + public void when_workerCount_is_more_than_1_as_many_CeWorkerCallable_are_scheduled() throws Exception { int workerCount = Math.abs(new Random().nextInt(10)) + 1; - ceConfiguration.setWorkerCount(workerCount); + CeWorker[] workers = new CeWorker[workerCount]; + for (int i = 0; i < workerCount; i++) { + workers[i] = mock(CeWorker.class); + when(workers[i].call()) + .thenReturn(false) + .thenThrow(ERROR_TO_INTERRUPT_CHAINING); + } + ListenableScheduledFuture listenableScheduledFuture = mock(ListenableScheduledFuture.class); CeProcessingSchedulerExecutorService processingExecutorService = mock(CeProcessingSchedulerExecutorService.class); - CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(ceConfiguration, processingExecutorService, ceWorkerRunnable); - when(processingExecutorService.schedule(ceWorkerRunnable, ceConfiguration.getQueuePollingDelay(), MILLISECONDS)) + when(processingExecutorService.schedule(any(CeWorker.class), any(Long.class),any(TimeUnit.class))).thenReturn(listenableScheduledFuture); + + CeWorkerFactory ceWorkerFactory = spy(new TestCeWorkerFactory(workers)); + CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(ceConfiguration, processingExecutorService, ceWorkerFactory); + when(processingExecutorService.schedule(ceWorker, ceConfiguration.getQueuePollingDelay(), MILLISECONDS)) .thenReturn(listenableScheduledFuture); underTest.startScheduling(); + // No exception from TestCeWorkerFactory must be thrown - verify(processingExecutorService, times(workerCount)).schedule(ceWorkerRunnable, ceConfiguration.getQueuePollingDelay(), MILLISECONDS); + // Verify that schedule has been called on all workers + for (int i = 0; i < workerCount; i++) { + verify(processingExecutorService).schedule(workers[i], ceConfiguration.getQueuePollingDelay(), MILLISECONDS); + } verify(listenableScheduledFuture, times(workerCount)).addListener(any(Runnable.class), eq(processingExecutorService)); + verify(ceWorkerFactory, times(workerCount)).create(); } private void startSchedulingAndRun() throws ExecutionException, InterruptedException { @@ -206,6 +227,25 @@ public class CeProcessingSchedulerImplTest { processingExecutorService.runFutures(); } + private class TestCeWorkerFactory implements CeWorkerFactory { + private final Iterator ceWorkers; + + private TestCeWorkerFactory(CeWorker... ceWorkers) { + this.ceWorkers = copyOf(ceWorkers).iterator(); + } + + @Override + public CeWorker create() { + // This will throw an NoSuchElementException if there are too many calls + return ceWorkers.next(); + } + + @Override + public Set getWorkerUUIDs() { + return emptySet(); + } + } + /** * A synchronous implementation of {@link CeProcessingSchedulerExecutorService} which exposes a synchronous * method to execute futures it creates and exposes a method to retrieve logs of calls to @@ -384,7 +424,7 @@ public class CeProcessingSchedulerImplTest { command.run(); } - // ///////// unsupported operations /////////// + /////////// unsupported operations /////////// @Override public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { @@ -538,5 +578,4 @@ public class CeProcessingSchedulerImplTest { '}'; } } - } diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerFactoryImplTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerFactoryImplTest.java new file mode 100644 index 00000000000..1f3b4aaea09 --- /dev/null +++ b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerFactoryImplTest.java @@ -0,0 +1,73 @@ +/* + * SonarQube + * Copyright (C) 2009-2017 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package org.sonar.ce.taskprocessor; + +import java.util.HashSet; +import java.util.Set; +import org.junit.Test; +import org.sonar.ce.log.CeLogging; +import org.sonar.ce.queue.InternalCeQueue; +import org.sonar.core.util.UuidFactoryImpl; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; + +public class CeWorkerFactoryImplTest { + private CeWorkerFactoryImpl underTest = new CeWorkerFactoryImpl(mock(InternalCeQueue.class), mock(CeLogging.class), + mock(CeTaskProcessorRepository.class), UuidFactoryImpl.INSTANCE); + + @Test + public void each_call_must_return_a_new_ceworker_with_unique_uuid() { + Set ceWorkers = new HashSet<>(); + Set ceWorkerUUIDs = new HashSet<>(); + + for (int i = 0; i < 10; i++) { + CeWorker ceWorker = underTest.create(); + ceWorkers.add(ceWorker); + ceWorkerUUIDs.add(ceWorker.getUUID()); + } + + assertThat(ceWorkers).hasSize(10); + assertThat(ceWorkerUUIDs).hasSize(10); + } + + @Test + public void ceworker_created_by_factory_must_contain_uuid() { + CeWorker ceWorker = underTest.create(); + assertThat(ceWorker.getUUID()).isNotEmpty(); + } + + @Test + public void CeWorkerFactory_has_an_empty_set_of_uuids_when_created() { + assertThat(underTest.getWorkerUUIDs()).isEmpty(); + } + + @Test + public void CeWorkerFactory_must_returns_the_uuids_of_worker() { + Set ceWorkerUUIDs = new HashSet<>(); + + for (int i = 0; i < 10; i++) { + ceWorkerUUIDs.add(underTest.create().getUUID()); + } + + assertThat(underTest.getWorkerUUIDs()).isEqualTo(ceWorkerUUIDs); + } +} diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerCallableImplTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerImplTest.java similarity index 94% rename from server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerCallableImplTest.java rename to server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerImplTest.java index 4fc5aee88b4..155507bb171 100644 --- a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerCallableImplTest.java +++ b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerImplTest.java @@ -21,6 +21,7 @@ package org.sonar.ce.taskprocessor; import java.util.List; import java.util.Optional; +import java.util.UUID; import javax.annotation.Nullable; import org.junit.Rule; import org.junit.Test; @@ -45,7 +46,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; -public class CeWorkerCallableImplTest { +public class CeWorkerImplTest { @Rule public CeTaskProcessorRepositoryRule taskProcessorRepository = new CeTaskProcessorRepositoryRule(); @@ -56,9 +57,16 @@ public class CeWorkerCallableImplTest { private ReportTaskProcessor taskProcessor = mock(ReportTaskProcessor.class); private CeLogging ceLogging = spy(CeLogging.class); private ArgumentCaptor workerUuid = ArgumentCaptor.forClass(String.class); - private CeWorkerCallable underTest = new CeWorkerCallableImpl(queue, ceLogging, taskProcessorRepository); + private CeWorker underTest = new CeWorkerImpl(queue, ceLogging, taskProcessorRepository, UUID.randomUUID().toString()); private InOrder inOrder = Mockito.inOrder(ceLogging, taskProcessor, queue); + @Test + public void getUUID_must_return_the_uuid_of_constructor() { + String uuid = UUID.randomUUID().toString(); + CeWorker underTest = new CeWorkerImpl(queue, ceLogging, taskProcessorRepository, uuid); + assertThat(underTest.getUUID()).isEqualTo(uuid); + } + @Test public void no_pending_tasks_in_queue() throws Exception { when(queue.peek(anyString())).thenReturn(Optional.empty()); @@ -226,7 +234,7 @@ public class CeWorkerCallableImplTest { private void verifyWorkerUuid() { verify(queue).peek(workerUuid.capture()); - assertThat(workerUuid.getValue()).startsWith("uuid"); + assertThat(workerUuid.getValue()).startsWith(workerUuid.getValue()); } private static CeTask createCeTask(@Nullable String submitterLogin) { -- 2.39.5