aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSébastien Lesaint <sebastien.lesaint@sonarsource.com>2017-04-18 12:35:48 +0200
committerEric Hartmann <hartmann.eric@gmail.Com>2017-04-27 09:23:18 +0200
commit36becb8f4ca1a931fe350dc34a5a4fefd0dfd81b (patch)
treef562456007f0e9461ab97fbe89825981425d452e
parentd463c9d9db163c0dca93a8ac720b50d5251fd5f9 (diff)
downloadsonarqube-36becb8f4ca1a931fe350dc34a5a4fefd0dfd81b.tar.gz
sonarqube-36becb8f4ca1a931fe350dc34a5a4fefd0dfd81b.zip
SONAR-8985 add unique identifier to CeWorker
-rw-r--r--server/sonar-ce-api/src/main/java/org/sonar/ce/configuration/CeConfiguration.java4
-rw-r--r--server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerExecutorService.java2
-rw-r--r--server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImpl.java19
-rw-r--r--server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeTaskProcessorModule.java2
-rw-r--r--server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorker.java (renamed from server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerCallable.java)3
-rw-r--r--server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerFactory.java39
-rw-r--r--server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerFactoryImpl.java56
-rw-r--r--server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerImpl.java (renamed from server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerCallableImpl.java)19
-rw-r--r--server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImplTest.java73
-rw-r--r--server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerFactoryImplTest.java73
-rw-r--r--server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerImplTest.java (renamed from server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerCallableImplTest.java)14
11 files changed, 265 insertions, 39 deletions
diff --git a/server/sonar-ce-api/src/main/java/org/sonar/ce/configuration/CeConfiguration.java b/server/sonar-ce-api/src/main/java/org/sonar/ce/configuration/CeConfiguration.java
index eedc8d4893f..dd27ba3f882 100644
--- a/server/sonar-ce-api/src/main/java/org/sonar/ce/configuration/CeConfiguration.java
+++ b/server/sonar-ce-api/src/main/java/org/sonar/ce/configuration/CeConfiguration.java
@@ -27,8 +27,8 @@ public interface CeConfiguration {
int getWorkerCount();
/**
- * The delay in milliseconds before calling another {@link org.sonar.server.computation.taskprocessor.CeWorkerCallable}
- * when previous one had nothing to do.
+ * The delay in millisecond before a {@link CeWorker} shall try and find a task
+ * to process when it's previous execution had nothing to do.
*/
long getQueuePollingDelay();
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerExecutorService.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerExecutorService.java
index 2aa14e6962d..75097fd6d0c 100644
--- a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerExecutorService.java
+++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerExecutorService.java
@@ -23,7 +23,7 @@ import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import org.sonar.server.util.StoppableExecutorService;
/**
- * The {@link java.util.concurrent.ExecutorService} responsible for running {@link CeWorkerCallableImpl}.
+ * The {@link java.util.concurrent.ExecutorService} responsible for running {@link CeWorkerImpl}.
*/
public interface CeProcessingSchedulerExecutorService extends StoppableExecutorService, ListeningScheduledExecutorService {
}
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImpl.java
index 47f56af6ff1..fd145a6815d 100644
--- a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImpl.java
+++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImpl.java
@@ -39,16 +39,14 @@ public class CeProcessingSchedulerImpl implements CeProcessingScheduler, Startab
private static final Logger LOG = Loggers.get(CeProcessingSchedulerImpl.class);
private final CeProcessingSchedulerExecutorService executorService;
- private final CeWorkerCallable workerRunnable;
private final long delayBetweenTasks;
private final TimeUnit timeUnit;
private final ChainingCallback[] chainingCallbacks;
public CeProcessingSchedulerImpl(CeConfiguration ceConfiguration,
- CeProcessingSchedulerExecutorService processingExecutorService, CeWorkerCallable workerRunnable) {
+ CeProcessingSchedulerExecutorService processingExecutorService, CeWorkerFactory ceCeWorkerFactory) {
this.executorService = processingExecutorService;
- this.workerRunnable = workerRunnable;
this.delayBetweenTasks = ceConfiguration.getQueuePollingDelay();
this.timeUnit = MILLISECONDS;
@@ -56,7 +54,8 @@ public class CeProcessingSchedulerImpl implements CeProcessingScheduler, Startab
int workerCount = ceConfiguration.getWorkerCount();
this.chainingCallbacks = new ChainingCallback[workerCount];
for (int i = 0; i < workerCount; i++) {
- chainingCallbacks[i] = new ChainingCallback();
+ CeWorker worker = ceCeWorkerFactory.create();
+ chainingCallbacks[i] = new ChainingCallback(worker);
}
}
@@ -68,7 +67,7 @@ public class CeProcessingSchedulerImpl implements CeProcessingScheduler, Startab
@Override
public void startScheduling() {
for (ChainingCallback chainingCallback : chainingCallbacks) {
- ListenableScheduledFuture<Boolean> future = executorService.schedule(workerRunnable, delayBetweenTasks, timeUnit);
+ ListenableScheduledFuture<Boolean> future = executorService.schedule(chainingCallback.worker, delayBetweenTasks, timeUnit);
addCallback(future, chainingCallback, executorService);
}
}
@@ -82,9 +81,15 @@ public class CeProcessingSchedulerImpl implements CeProcessingScheduler, Startab
private class ChainingCallback implements FutureCallback<Boolean> {
private final AtomicBoolean keepRunning = new AtomicBoolean(true);
+ private final CeWorker worker;
+
@CheckForNull
private ListenableFuture<Boolean> workerFuture;
+ public ChainingCallback(CeWorker worker) {
+ this.worker = worker;
+ }
+
@Override
public void onSuccess(@Nullable Boolean result) {
if (result != null && result) {
@@ -105,14 +110,14 @@ public class CeProcessingSchedulerImpl implements CeProcessingScheduler, Startab
private void chainWithoutDelay() {
if (keepRunning()) {
- workerFuture = executorService.submit(workerRunnable);
+ workerFuture = executorService.submit(worker);
}
addCallback();
}
private void chainWithDelay() {
if (keepRunning()) {
- workerFuture = executorService.schedule(workerRunnable, delayBetweenTasks, timeUnit);
+ workerFuture = executorService.schedule(worker, delayBetweenTasks, timeUnit);
}
addCallback();
}
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeTaskProcessorModule.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeTaskProcessorModule.java
index b6f08854b8c..9163507abe9 100644
--- a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeTaskProcessorModule.java
+++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeTaskProcessorModule.java
@@ -26,7 +26,7 @@ public class CeTaskProcessorModule extends Module {
protected void configureModule() {
add(
CeTaskProcessorRepositoryImpl.class,
- CeWorkerCallableImpl.class,
+ CeWorkerFactoryImpl.class,
CeProcessingSchedulerExecutorServiceImpl.class,
CeProcessingSchedulerImpl.class);
}
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerCallable.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorker.java
index 2618b37b070..7390b4a1d41 100644
--- a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerCallable.java
+++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorker.java
@@ -28,5 +28,6 @@ import org.sonar.ce.queue.CeTask;
* {@link Callable#call()} returns a Boolean which is {@code true} when some a {@link CeTask} was processed,
* {@code false} otherwise.
*/
-public interface CeWorkerCallable extends Callable<Boolean> {
+public interface CeWorker extends Callable<Boolean> {
+ String getUUID();
}
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerFactory.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerFactory.java
new file mode 100644
index 00000000000..65d9096da66
--- /dev/null
+++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerFactory.java
@@ -0,0 +1,39 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.ce.taskprocessor;
+
+import java.util.Set;
+
+/**
+ * A factory that will create the CeWorkerFactory with an UUID
+ */
+public interface CeWorkerFactory {
+ /**
+ * Create a new CeWorker object.
+ * Each {@link CeWorker} returned by this method will have a different UUID from the others and all of these UUIDS will be returned by {@link #getWorkerUUIDs()}.
+ *
+ * @return the CeWorker
+ */
+ CeWorker create();
+ /**
+ * @return the UUIDs of each {@link CeWorker} object returned by {@link #create}.
+ */
+ Set<String> getWorkerUUIDs();
+}
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerFactoryImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerFactoryImpl.java
new file mode 100644
index 00000000000..e684caa7fe7
--- /dev/null
+++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerFactoryImpl.java
@@ -0,0 +1,56 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package org.sonar.ce.taskprocessor;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.sonar.ce.log.CeLogging;
+import org.sonar.ce.queue.InternalCeQueue;
+import org.sonar.core.util.UuidFactory;
+
+import static com.google.common.collect.ImmutableSet.copyOf;
+
+public class CeWorkerFactoryImpl implements CeWorkerFactory {
+ private final UuidFactory uuidFactory;
+ private final Set<String> ceWorkerUUIDs = new HashSet<>();
+ private final InternalCeQueue queue;
+ private final CeLogging ceLogging;
+ private final CeTaskProcessorRepository taskProcessorRepository;
+
+ public CeWorkerFactoryImpl(InternalCeQueue queue, CeLogging ceLogging, CeTaskProcessorRepository taskProcessorRepository, UuidFactory uuidFactory) {
+ this.queue = queue;
+ this.ceLogging = ceLogging;
+ this.taskProcessorRepository = taskProcessorRepository;
+ this.uuidFactory= uuidFactory;
+ }
+
+ @Override
+ public CeWorker create() {
+ String uuid = uuidFactory.create();
+ ceWorkerUUIDs.add(uuid);
+ return new CeWorkerImpl(queue, ceLogging, taskProcessorRepository, uuid);
+ }
+
+ @Override
+ public Set<String> getWorkerUUIDs() {
+ return copyOf(ceWorkerUUIDs);
+ }
+}
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerCallableImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerImpl.java
index eb6d9d2ee24..025f6366655 100644
--- a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerCallableImpl.java
+++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerImpl.java
@@ -20,7 +20,6 @@
package org.sonar.ce.taskprocessor;
import java.util.Optional;
-import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import org.sonar.api.utils.log.Logger;
import org.sonar.api.utils.log.Loggers;
@@ -33,18 +32,20 @@ import org.sonar.db.ce.CeActivityDto;
import static java.lang.String.format;
-public class CeWorkerCallableImpl implements CeWorkerCallable {
+public class CeWorkerImpl implements CeWorker {
- private static final Logger LOG = Loggers.get(CeWorkerCallableImpl.class);
+ private static final Logger LOG = Loggers.get(CeWorkerImpl.class);
private final InternalCeQueue queue;
private final CeLogging ceLogging;
private final CeTaskProcessorRepository taskProcessorRepository;
+ private final String uuid;
- public CeWorkerCallableImpl(InternalCeQueue queue, CeLogging ceLogging, CeTaskProcessorRepository taskProcessorRepository) {
+ public CeWorkerImpl(InternalCeQueue queue, CeLogging ceLogging, CeTaskProcessorRepository taskProcessorRepository, String uuid) {
this.queue = queue;
this.ceLogging = ceLogging;
this.taskProcessorRepository = taskProcessorRepository;
+ this.uuid = uuid;
}
@Override
@@ -62,16 +63,21 @@ public class CeWorkerCallableImpl implements CeWorkerCallable {
return true;
}
- private static final AtomicLong counter = new AtomicLong(0);
+
private Optional<CeTask> tryAndFindTaskToExecute() {
try {
- return queue.peek("uuid" + counter.addAndGet(100));
+ return queue.peek(uuid);
} catch (Exception e) {
LOG.error("Failed to pop the queue of analysis reports", e);
}
return Optional.empty();
}
+ @Override
+ public String getUUID() {
+ return uuid;
+ }
+
private void executeTask(CeTask task) {
ceLogging.initForTask(task);
Profiler ceProfiler = startActivityProfiler(task);
@@ -135,5 +141,4 @@ public class CeWorkerCallableImpl implements CeWorkerCallable {
profiler.addContext("submitter", submitterLogin);
}
}
-
}
diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImplTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImplTest.java
index d3480186bd2..43585cf63d1 100644
--- a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImplTest.java
+++ b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImplTest.java
@@ -25,10 +25,12 @@ import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.Random;
+import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Delayed;
@@ -45,11 +47,14 @@ import org.junit.Test;
import org.junit.rules.Timeout;
import org.sonar.ce.configuration.CeConfigurationRule;
+import static com.google.common.collect.ImmutableList.copyOf;
+import static java.util.Collections.emptySet;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -62,17 +67,18 @@ public class CeProcessingSchedulerImplTest {
public Timeout timeout = Timeout.seconds(60);
@Rule
public CeConfigurationRule ceConfiguration = new CeConfigurationRule();
-
- private CeWorkerCallable ceWorkerRunnable = mock(CeWorkerCallable.class);
+ // Required to prevent an infinite loop
+ private CeWorker ceWorker = mock(CeWorker.class);
+ private CeWorkerFactory ceWorkerFactory = new TestCeWorkerFactory(ceWorker);
private StubCeProcessingSchedulerExecutorService processingExecutorService = new StubCeProcessingSchedulerExecutorService();
- private SchedulerCall regularDelayedPoll = new SchedulerCall(ceWorkerRunnable, 2000L, TimeUnit.MILLISECONDS);
- private SchedulerCall notDelayedPoll = new SchedulerCall(ceWorkerRunnable);
+ private SchedulerCall regularDelayedPoll = new SchedulerCall(ceWorker, 2000L, MILLISECONDS);
+ private SchedulerCall notDelayedPoll = new SchedulerCall(ceWorker);
- private CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(ceConfiguration, processingExecutorService, ceWorkerRunnable);
+ private CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(ceConfiguration, processingExecutorService, ceWorkerFactory);
@Test
public void polls_without_delay_when_CeWorkerCallable_returns_true() throws Exception {
- when(ceWorkerRunnable.call())
+ when(ceWorker.call())
.thenReturn(true)
.thenThrow(ERROR_TO_INTERRUPT_CHAINING);
@@ -86,7 +92,7 @@ public class CeProcessingSchedulerImplTest {
@Test
public void polls_without_delay_when_CeWorkerCallable_throws_Exception_but_not_Error() throws Exception {
- when(ceWorkerRunnable.call())
+ when(ceWorker.call())
.thenThrow(new Exception("Exception is followed by a poll without delay"))
.thenThrow(ERROR_TO_INTERRUPT_CHAINING);
@@ -100,7 +106,7 @@ public class CeProcessingSchedulerImplTest {
@Test
public void polls_with_regular_delay_when_CeWorkerCallable_returns_false() throws Exception {
- when(ceWorkerRunnable.call())
+ when(ceWorker.call())
.thenReturn(false)
.thenThrow(ERROR_TO_INTERRUPT_CHAINING);
@@ -114,7 +120,7 @@ public class CeProcessingSchedulerImplTest {
@Test
public void startScheduling_schedules_CeWorkerCallable_at_fixed_rate_run_head_of_queue() throws Exception {
- when(ceWorkerRunnable.call())
+ when(ceWorker.call())
.thenReturn(true)
.thenReturn(true)
.thenReturn(false)
@@ -144,7 +150,7 @@ public class CeProcessingSchedulerImplTest {
@Test
public void stop_cancels_next_polling_and_does_not_add_any_new_one() throws Exception {
- when(ceWorkerRunnable.call())
+ when(ceWorker.call())
.thenReturn(false)
.thenReturn(true)
.thenReturn(false)
@@ -182,21 +188,36 @@ public class CeProcessingSchedulerImplTest {
}
@Test
- public void when_workerCount_is_more_than_1_as_many_CeWorkerCallable_are_scheduled() throws InterruptedException {
+ public void when_workerCount_is_more_than_1_as_many_CeWorkerCallable_are_scheduled() throws Exception {
int workerCount = Math.abs(new Random().nextInt(10)) + 1;
-
ceConfiguration.setWorkerCount(workerCount);
+ CeWorker[] workers = new CeWorker[workerCount];
+ for (int i = 0; i < workerCount; i++) {
+ workers[i] = mock(CeWorker.class);
+ when(workers[i].call())
+ .thenReturn(false)
+ .thenThrow(ERROR_TO_INTERRUPT_CHAINING);
+ }
+
ListenableScheduledFuture listenableScheduledFuture = mock(ListenableScheduledFuture.class);
CeProcessingSchedulerExecutorService processingExecutorService = mock(CeProcessingSchedulerExecutorService.class);
- CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(ceConfiguration, processingExecutorService, ceWorkerRunnable);
- when(processingExecutorService.schedule(ceWorkerRunnable, ceConfiguration.getQueuePollingDelay(), MILLISECONDS))
+ when(processingExecutorService.schedule(any(CeWorker.class), any(Long.class),any(TimeUnit.class))).thenReturn(listenableScheduledFuture);
+
+ CeWorkerFactory ceWorkerFactory = spy(new TestCeWorkerFactory(workers));
+ CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(ceConfiguration, processingExecutorService, ceWorkerFactory);
+ when(processingExecutorService.schedule(ceWorker, ceConfiguration.getQueuePollingDelay(), MILLISECONDS))
.thenReturn(listenableScheduledFuture);
underTest.startScheduling();
+ // No exception from TestCeWorkerFactory must be thrown
- verify(processingExecutorService, times(workerCount)).schedule(ceWorkerRunnable, ceConfiguration.getQueuePollingDelay(), MILLISECONDS);
+ // Verify that schedule has been called on all workers
+ for (int i = 0; i < workerCount; i++) {
+ verify(processingExecutorService).schedule(workers[i], ceConfiguration.getQueuePollingDelay(), MILLISECONDS);
+ }
verify(listenableScheduledFuture, times(workerCount)).addListener(any(Runnable.class), eq(processingExecutorService));
+ verify(ceWorkerFactory, times(workerCount)).create();
}
private void startSchedulingAndRun() throws ExecutionException, InterruptedException {
@@ -206,6 +227,25 @@ public class CeProcessingSchedulerImplTest {
processingExecutorService.runFutures();
}
+ private class TestCeWorkerFactory implements CeWorkerFactory {
+ private final Iterator<CeWorker> ceWorkers;
+
+ private TestCeWorkerFactory(CeWorker... ceWorkers) {
+ this.ceWorkers = copyOf(ceWorkers).iterator();
+ }
+
+ @Override
+ public CeWorker create() {
+ // This will throw an NoSuchElementException if there are too many calls
+ return ceWorkers.next();
+ }
+
+ @Override
+ public Set<String> getWorkerUUIDs() {
+ return emptySet();
+ }
+ }
+
/**
* A synchronous implementation of {@link CeProcessingSchedulerExecutorService} which exposes a synchronous
* method to execute futures it creates and exposes a method to retrieve logs of calls to
@@ -384,7 +424,7 @@ public class CeProcessingSchedulerImplTest {
command.run();
}
- // ///////// unsupported operations ///////////
+ /////////// unsupported operations ///////////
@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
@@ -538,5 +578,4 @@ public class CeProcessingSchedulerImplTest {
'}';
}
}
-
}
diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerFactoryImplTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerFactoryImplTest.java
new file mode 100644
index 00000000000..1f3b4aaea09
--- /dev/null
+++ b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerFactoryImplTest.java
@@ -0,0 +1,73 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package org.sonar.ce.taskprocessor;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.junit.Test;
+import org.sonar.ce.log.CeLogging;
+import org.sonar.ce.queue.InternalCeQueue;
+import org.sonar.core.util.UuidFactoryImpl;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+
+public class CeWorkerFactoryImplTest {
+ private CeWorkerFactoryImpl underTest = new CeWorkerFactoryImpl(mock(InternalCeQueue.class), mock(CeLogging.class),
+ mock(CeTaskProcessorRepository.class), UuidFactoryImpl.INSTANCE);
+
+ @Test
+ public void each_call_must_return_a_new_ceworker_with_unique_uuid() {
+ Set<CeWorker> ceWorkers = new HashSet<>();
+ Set<String> ceWorkerUUIDs = new HashSet<>();
+
+ for (int i = 0; i < 10; i++) {
+ CeWorker ceWorker = underTest.create();
+ ceWorkers.add(ceWorker);
+ ceWorkerUUIDs.add(ceWorker.getUUID());
+ }
+
+ assertThat(ceWorkers).hasSize(10);
+ assertThat(ceWorkerUUIDs).hasSize(10);
+ }
+
+ @Test
+ public void ceworker_created_by_factory_must_contain_uuid() {
+ CeWorker ceWorker = underTest.create();
+ assertThat(ceWorker.getUUID()).isNotEmpty();
+ }
+
+ @Test
+ public void CeWorkerFactory_has_an_empty_set_of_uuids_when_created() {
+ assertThat(underTest.getWorkerUUIDs()).isEmpty();
+ }
+
+ @Test
+ public void CeWorkerFactory_must_returns_the_uuids_of_worker() {
+ Set<String> ceWorkerUUIDs = new HashSet<>();
+
+ for (int i = 0; i < 10; i++) {
+ ceWorkerUUIDs.add(underTest.create().getUUID());
+ }
+
+ assertThat(underTest.getWorkerUUIDs()).isEqualTo(ceWorkerUUIDs);
+ }
+}
diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerCallableImplTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerImplTest.java
index 4fc5aee88b4..155507bb171 100644
--- a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerCallableImplTest.java
+++ b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerImplTest.java
@@ -21,6 +21,7 @@ package org.sonar.ce.taskprocessor;
import java.util.List;
import java.util.Optional;
+import java.util.UUID;
import javax.annotation.Nullable;
import org.junit.Rule;
import org.junit.Test;
@@ -45,7 +46,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
-public class CeWorkerCallableImplTest {
+public class CeWorkerImplTest {
@Rule
public CeTaskProcessorRepositoryRule taskProcessorRepository = new CeTaskProcessorRepositoryRule();
@@ -56,10 +57,17 @@ public class CeWorkerCallableImplTest {
private ReportTaskProcessor taskProcessor = mock(ReportTaskProcessor.class);
private CeLogging ceLogging = spy(CeLogging.class);
private ArgumentCaptor<String> workerUuid = ArgumentCaptor.forClass(String.class);
- private CeWorkerCallable underTest = new CeWorkerCallableImpl(queue, ceLogging, taskProcessorRepository);
+ private CeWorker underTest = new CeWorkerImpl(queue, ceLogging, taskProcessorRepository, UUID.randomUUID().toString());
private InOrder inOrder = Mockito.inOrder(ceLogging, taskProcessor, queue);
@Test
+ public void getUUID_must_return_the_uuid_of_constructor() {
+ String uuid = UUID.randomUUID().toString();
+ CeWorker underTest = new CeWorkerImpl(queue, ceLogging, taskProcessorRepository, uuid);
+ assertThat(underTest.getUUID()).isEqualTo(uuid);
+ }
+
+ @Test
public void no_pending_tasks_in_queue() throws Exception {
when(queue.peek(anyString())).thenReturn(Optional.empty());
@@ -226,7 +234,7 @@ public class CeWorkerCallableImplTest {
private void verifyWorkerUuid() {
verify(queue).peek(workerUuid.capture());
- assertThat(workerUuid.getValue()).startsWith("uuid");
+ assertThat(workerUuid.getValue()).startsWith(workerUuid.getValue());
}
private static CeTask createCeTask(@Nullable String submitterLogin) {