int getWorkerCount();
/**
- * The delay in milliseconds before calling another {@link org.sonar.server.computation.taskprocessor.CeWorkerCallable}
- * when previous one had nothing to do.
+ * The delay in millisecond before a {@link CeWorker} shall try and find a task
+ * to process when it's previous execution had nothing to do.
*/
long getQueuePollingDelay();
import org.sonar.server.util.StoppableExecutorService;
/**
- * The {@link java.util.concurrent.ExecutorService} responsible for running {@link CeWorkerCallableImpl}.
+ * The {@link java.util.concurrent.ExecutorService} responsible for running {@link CeWorkerImpl}.
*/
public interface CeProcessingSchedulerExecutorService extends StoppableExecutorService, ListeningScheduledExecutorService {
}
private static final Logger LOG = Loggers.get(CeProcessingSchedulerImpl.class);
private final CeProcessingSchedulerExecutorService executorService;
- private final CeWorkerCallable workerRunnable;
private final long delayBetweenTasks;
private final TimeUnit timeUnit;
private final ChainingCallback[] chainingCallbacks;
public CeProcessingSchedulerImpl(CeConfiguration ceConfiguration,
- CeProcessingSchedulerExecutorService processingExecutorService, CeWorkerCallable workerRunnable) {
+ CeProcessingSchedulerExecutorService processingExecutorService, CeWorkerFactory ceCeWorkerFactory) {
this.executorService = processingExecutorService;
- this.workerRunnable = workerRunnable;
this.delayBetweenTasks = ceConfiguration.getQueuePollingDelay();
this.timeUnit = MILLISECONDS;
int workerCount = ceConfiguration.getWorkerCount();
this.chainingCallbacks = new ChainingCallback[workerCount];
for (int i = 0; i < workerCount; i++) {
- chainingCallbacks[i] = new ChainingCallback();
+ CeWorker worker = ceCeWorkerFactory.create();
+ chainingCallbacks[i] = new ChainingCallback(worker);
}
}
@Override
public void startScheduling() {
for (ChainingCallback chainingCallback : chainingCallbacks) {
- ListenableScheduledFuture<Boolean> future = executorService.schedule(workerRunnable, delayBetweenTasks, timeUnit);
+ ListenableScheduledFuture<Boolean> future = executorService.schedule(chainingCallback.worker, delayBetweenTasks, timeUnit);
addCallback(future, chainingCallback, executorService);
}
}
private class ChainingCallback implements FutureCallback<Boolean> {
private final AtomicBoolean keepRunning = new AtomicBoolean(true);
+ private final CeWorker worker;
+
@CheckForNull
private ListenableFuture<Boolean> workerFuture;
+ public ChainingCallback(CeWorker worker) {
+ this.worker = worker;
+ }
+
@Override
public void onSuccess(@Nullable Boolean result) {
if (result != null && result) {
private void chainWithoutDelay() {
if (keepRunning()) {
- workerFuture = executorService.submit(workerRunnable);
+ workerFuture = executorService.submit(worker);
}
addCallback();
}
private void chainWithDelay() {
if (keepRunning()) {
- workerFuture = executorService.schedule(workerRunnable, delayBetweenTasks, timeUnit);
+ workerFuture = executorService.schedule(worker, delayBetweenTasks, timeUnit);
}
addCallback();
}
protected void configureModule() {
add(
CeTaskProcessorRepositoryImpl.class,
- CeWorkerCallableImpl.class,
+ CeWorkerFactoryImpl.class,
CeProcessingSchedulerExecutorServiceImpl.class,
CeProcessingSchedulerImpl.class);
}
--- /dev/null
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.ce.taskprocessor;
+
+import java.util.concurrent.Callable;
+import org.sonar.ce.queue.CeQueue;
+import org.sonar.ce.queue.CeTask;
+
+/**
+ * Marker interface of the runnable in charge of polling the {@link CeQueue} and executing {@link CeTask}.
+ * {@link Callable#call()} returns a Boolean which is {@code true} when some a {@link CeTask} was processed,
+ * {@code false} otherwise.
+ */
+public interface CeWorker extends Callable<Boolean> {
+ String getUUID();
+}
+++ /dev/null
-/*
- * SonarQube
- * Copyright (C) 2009-2017 SonarSource SA
- * mailto:info AT sonarsource DOT com
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-package org.sonar.ce.taskprocessor;
-
-import java.util.concurrent.Callable;
-import org.sonar.ce.queue.CeQueue;
-import org.sonar.ce.queue.CeTask;
-
-/**
- * Marker interface of the runnable in charge of polling the {@link CeQueue} and executing {@link CeTask}.
- * {@link Callable#call()} returns a Boolean which is {@code true} when some a {@link CeTask} was processed,
- * {@code false} otherwise.
- */
-public interface CeWorkerCallable extends Callable<Boolean> {
-}
+++ /dev/null
-/*
- * SonarQube
- * Copyright (C) 2009-2017 SonarSource SA
- * mailto:info AT sonarsource DOT com
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-package org.sonar.ce.taskprocessor;
-
-import java.util.Optional;
-import java.util.concurrent.atomic.AtomicLong;
-import javax.annotation.Nullable;
-import org.sonar.api.utils.log.Logger;
-import org.sonar.api.utils.log.Loggers;
-import org.sonar.ce.log.CeLogging;
-import org.sonar.ce.queue.CeTask;
-import org.sonar.ce.queue.CeTaskResult;
-import org.sonar.ce.queue.InternalCeQueue;
-import org.sonar.core.util.logs.Profiler;
-import org.sonar.db.ce.CeActivityDto;
-
-import static java.lang.String.format;
-
-public class CeWorkerCallableImpl implements CeWorkerCallable {
-
- private static final Logger LOG = Loggers.get(CeWorkerCallableImpl.class);
-
- private final InternalCeQueue queue;
- private final CeLogging ceLogging;
- private final CeTaskProcessorRepository taskProcessorRepository;
-
- public CeWorkerCallableImpl(InternalCeQueue queue, CeLogging ceLogging, CeTaskProcessorRepository taskProcessorRepository) {
- this.queue = queue;
- this.ceLogging = ceLogging;
- this.taskProcessorRepository = taskProcessorRepository;
- }
-
- @Override
- public Boolean call() throws Exception {
- Optional<CeTask> ceTask = tryAndFindTaskToExecute();
- if (!ceTask.isPresent()) {
- return false;
- }
-
- try {
- executeTask(ceTask.get());
- } catch (Exception e) {
- LOG.error(format("An error occurred while executing task with uuid '%s'", ceTask.get().getUuid()), e);
- }
- return true;
- }
-
- private static final AtomicLong counter = new AtomicLong(0);
- private Optional<CeTask> tryAndFindTaskToExecute() {
- try {
- return queue.peek("uuid" + counter.addAndGet(100));
- } catch (Exception e) {
- LOG.error("Failed to pop the queue of analysis reports", e);
- }
- return Optional.empty();
- }
-
- private void executeTask(CeTask task) {
- ceLogging.initForTask(task);
- Profiler ceProfiler = startActivityProfiler(task);
-
- CeActivityDto.Status status = CeActivityDto.Status.FAILED;
- CeTaskResult taskResult = null;
- Throwable error = null;
- try {
- // TODO delegate the message to the related task processor, according to task type
- Optional<CeTaskProcessor> taskProcessor = taskProcessorRepository.getForCeTask(task);
- if (taskProcessor.isPresent()) {
- taskResult = taskProcessor.get().process(task);
- status = CeActivityDto.Status.SUCCESS;
- } else {
- LOG.error("No CeTaskProcessor is defined for task of type {}. Plugin configuration may have changed", task.getType());
- status = CeActivityDto.Status.FAILED;
- }
- } catch (Throwable e) {
- LOG.error(format("Failed to execute task %s", task.getUuid()), e);
- error = e;
- } finally {
- finalizeTask(task, ceProfiler, status, taskResult, error);
- }
- }
-
- private void finalizeTask(CeTask task, Profiler ceProfiler, CeActivityDto.Status status,
- @Nullable CeTaskResult taskResult, @Nullable Throwable error) {
- try {
- queue.remove(task, status, taskResult, error);
- } catch (Exception e) {
- LOG.error(format("Failed to finalize task with uuid '%s' and persist its state to db", task.getUuid()), e);
- } finally {
- stopActivityProfiler(ceProfiler, task, status);
- ceLogging.clearForTask();
- }
- }
-
- private static Profiler startActivityProfiler(CeTask task) {
- Profiler profiler = Profiler.create(LOG);
- addContext(profiler, task);
- return profiler.startInfo("Execute task");
- }
-
- private static void stopActivityProfiler(Profiler profiler, CeTask task, CeActivityDto.Status status) {
- addContext(profiler, task);
- if (status == CeActivityDto.Status.FAILED) {
- profiler.stopError("Executed task");
- } else {
- profiler.stopInfo("Executed task");
- }
- }
-
- private static void addContext(Profiler profiler, CeTask task) {
- profiler
- .logTimeLast(true)
- .addContext("project", task.getComponentKey())
- .addContext("type", task.getType())
- .addContext("id", task.getUuid());
- String submitterLogin = task.getSubmitterLogin();
- if (submitterLogin != null) {
- profiler.addContext("submitter", submitterLogin);
- }
- }
-
-}
--- /dev/null
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.ce.taskprocessor;
+
+import java.util.Set;
+
+/**
+ * A factory that will create the CeWorkerFactory with an UUID
+ */
+public interface CeWorkerFactory {
+ /**
+ * Create a new CeWorker object.
+ * Each {@link CeWorker} returned by this method will have a different UUID from the others and all of these UUIDS will be returned by {@link #getWorkerUUIDs()}.
+ *
+ * @return the CeWorker
+ */
+ CeWorker create();
+ /**
+ * @return the UUIDs of each {@link CeWorker} object returned by {@link #create}.
+ */
+ Set<String> getWorkerUUIDs();
+}
--- /dev/null
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package org.sonar.ce.taskprocessor;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.sonar.ce.log.CeLogging;
+import org.sonar.ce.queue.InternalCeQueue;
+import org.sonar.core.util.UuidFactory;
+
+import static com.google.common.collect.ImmutableSet.copyOf;
+
+public class CeWorkerFactoryImpl implements CeWorkerFactory {
+ private final UuidFactory uuidFactory;
+ private final Set<String> ceWorkerUUIDs = new HashSet<>();
+ private final InternalCeQueue queue;
+ private final CeLogging ceLogging;
+ private final CeTaskProcessorRepository taskProcessorRepository;
+
+ public CeWorkerFactoryImpl(InternalCeQueue queue, CeLogging ceLogging, CeTaskProcessorRepository taskProcessorRepository, UuidFactory uuidFactory) {
+ this.queue = queue;
+ this.ceLogging = ceLogging;
+ this.taskProcessorRepository = taskProcessorRepository;
+ this.uuidFactory= uuidFactory;
+ }
+
+ @Override
+ public CeWorker create() {
+ String uuid = uuidFactory.create();
+ ceWorkerUUIDs.add(uuid);
+ return new CeWorkerImpl(queue, ceLogging, taskProcessorRepository, uuid);
+ }
+
+ @Override
+ public Set<String> getWorkerUUIDs() {
+ return copyOf(ceWorkerUUIDs);
+ }
+}
--- /dev/null
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.ce.taskprocessor;
+
+import java.util.Optional;
+import javax.annotation.Nullable;
+import org.sonar.api.utils.log.Logger;
+import org.sonar.api.utils.log.Loggers;
+import org.sonar.ce.log.CeLogging;
+import org.sonar.ce.queue.CeTask;
+import org.sonar.ce.queue.CeTaskResult;
+import org.sonar.ce.queue.InternalCeQueue;
+import org.sonar.core.util.logs.Profiler;
+import org.sonar.db.ce.CeActivityDto;
+
+import static java.lang.String.format;
+
+public class CeWorkerImpl implements CeWorker {
+
+ private static final Logger LOG = Loggers.get(CeWorkerImpl.class);
+
+ private final InternalCeQueue queue;
+ private final CeLogging ceLogging;
+ private final CeTaskProcessorRepository taskProcessorRepository;
+ private final String uuid;
+
+ public CeWorkerImpl(InternalCeQueue queue, CeLogging ceLogging, CeTaskProcessorRepository taskProcessorRepository, String uuid) {
+ this.queue = queue;
+ this.ceLogging = ceLogging;
+ this.taskProcessorRepository = taskProcessorRepository;
+ this.uuid = uuid;
+ }
+
+ @Override
+ public Boolean call() throws Exception {
+ Optional<CeTask> ceTask = tryAndFindTaskToExecute();
+ if (!ceTask.isPresent()) {
+ return false;
+ }
+
+ try {
+ executeTask(ceTask.get());
+ } catch (Exception e) {
+ LOG.error(format("An error occurred while executing task with uuid '%s'", ceTask.get().getUuid()), e);
+ }
+ return true;
+ }
+
+
+ private Optional<CeTask> tryAndFindTaskToExecute() {
+ try {
+ return queue.peek(uuid);
+ } catch (Exception e) {
+ LOG.error("Failed to pop the queue of analysis reports", e);
+ }
+ return Optional.empty();
+ }
+
+ @Override
+ public String getUUID() {
+ return uuid;
+ }
+
+ private void executeTask(CeTask task) {
+ ceLogging.initForTask(task);
+ Profiler ceProfiler = startActivityProfiler(task);
+
+ CeActivityDto.Status status = CeActivityDto.Status.FAILED;
+ CeTaskResult taskResult = null;
+ Throwable error = null;
+ try {
+ // TODO delegate the message to the related task processor, according to task type
+ Optional<CeTaskProcessor> taskProcessor = taskProcessorRepository.getForCeTask(task);
+ if (taskProcessor.isPresent()) {
+ taskResult = taskProcessor.get().process(task);
+ status = CeActivityDto.Status.SUCCESS;
+ } else {
+ LOG.error("No CeTaskProcessor is defined for task of type {}. Plugin configuration may have changed", task.getType());
+ status = CeActivityDto.Status.FAILED;
+ }
+ } catch (Throwable e) {
+ LOG.error(format("Failed to execute task %s", task.getUuid()), e);
+ error = e;
+ } finally {
+ finalizeTask(task, ceProfiler, status, taskResult, error);
+ }
+ }
+
+ private void finalizeTask(CeTask task, Profiler ceProfiler, CeActivityDto.Status status,
+ @Nullable CeTaskResult taskResult, @Nullable Throwable error) {
+ try {
+ queue.remove(task, status, taskResult, error);
+ } catch (Exception e) {
+ LOG.error(format("Failed to finalize task with uuid '%s' and persist its state to db", task.getUuid()), e);
+ } finally {
+ stopActivityProfiler(ceProfiler, task, status);
+ ceLogging.clearForTask();
+ }
+ }
+
+ private static Profiler startActivityProfiler(CeTask task) {
+ Profiler profiler = Profiler.create(LOG);
+ addContext(profiler, task);
+ return profiler.startInfo("Execute task");
+ }
+
+ private static void stopActivityProfiler(Profiler profiler, CeTask task, CeActivityDto.Status status) {
+ addContext(profiler, task);
+ if (status == CeActivityDto.Status.FAILED) {
+ profiler.stopError("Executed task");
+ } else {
+ profiler.stopInfo("Executed task");
+ }
+ }
+
+ private static void addContext(Profiler profiler, CeTask task) {
+ profiler
+ .logTimeLast(true)
+ .addContext("project", task.getComponentKey())
+ .addContext("type", task.getType())
+ .addContext("id", task.getUuid());
+ String submitterLogin = task.getSubmitterLogin();
+ if (submitterLogin != null) {
+ profiler.addContext("submitter", submitterLogin);
+ }
+ }
+}
import com.google.common.util.concurrent.MoreExecutors;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.Random;
+import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Delayed;
import org.junit.rules.Timeout;
import org.sonar.ce.configuration.CeConfigurationRule;
+import static com.google.common.collect.ImmutableList.copyOf;
+import static java.util.Collections.emptySet;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public Timeout timeout = Timeout.seconds(60);
@Rule
public CeConfigurationRule ceConfiguration = new CeConfigurationRule();
-
- private CeWorkerCallable ceWorkerRunnable = mock(CeWorkerCallable.class);
+ // Required to prevent an infinite loop
+ private CeWorker ceWorker = mock(CeWorker.class);
+ private CeWorkerFactory ceWorkerFactory = new TestCeWorkerFactory(ceWorker);
private StubCeProcessingSchedulerExecutorService processingExecutorService = new StubCeProcessingSchedulerExecutorService();
- private SchedulerCall regularDelayedPoll = new SchedulerCall(ceWorkerRunnable, 2000L, TimeUnit.MILLISECONDS);
- private SchedulerCall notDelayedPoll = new SchedulerCall(ceWorkerRunnable);
+ private SchedulerCall regularDelayedPoll = new SchedulerCall(ceWorker, 2000L, MILLISECONDS);
+ private SchedulerCall notDelayedPoll = new SchedulerCall(ceWorker);
- private CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(ceConfiguration, processingExecutorService, ceWorkerRunnable);
+ private CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(ceConfiguration, processingExecutorService, ceWorkerFactory);
@Test
public void polls_without_delay_when_CeWorkerCallable_returns_true() throws Exception {
- when(ceWorkerRunnable.call())
+ when(ceWorker.call())
.thenReturn(true)
.thenThrow(ERROR_TO_INTERRUPT_CHAINING);
@Test
public void polls_without_delay_when_CeWorkerCallable_throws_Exception_but_not_Error() throws Exception {
- when(ceWorkerRunnable.call())
+ when(ceWorker.call())
.thenThrow(new Exception("Exception is followed by a poll without delay"))
.thenThrow(ERROR_TO_INTERRUPT_CHAINING);
@Test
public void polls_with_regular_delay_when_CeWorkerCallable_returns_false() throws Exception {
- when(ceWorkerRunnable.call())
+ when(ceWorker.call())
.thenReturn(false)
.thenThrow(ERROR_TO_INTERRUPT_CHAINING);
@Test
public void startScheduling_schedules_CeWorkerCallable_at_fixed_rate_run_head_of_queue() throws Exception {
- when(ceWorkerRunnable.call())
+ when(ceWorker.call())
.thenReturn(true)
.thenReturn(true)
.thenReturn(false)
@Test
public void stop_cancels_next_polling_and_does_not_add_any_new_one() throws Exception {
- when(ceWorkerRunnable.call())
+ when(ceWorker.call())
.thenReturn(false)
.thenReturn(true)
.thenReturn(false)
}
@Test
- public void when_workerCount_is_more_than_1_as_many_CeWorkerCallable_are_scheduled() throws InterruptedException {
+ public void when_workerCount_is_more_than_1_as_many_CeWorkerCallable_are_scheduled() throws Exception {
int workerCount = Math.abs(new Random().nextInt(10)) + 1;
-
ceConfiguration.setWorkerCount(workerCount);
+ CeWorker[] workers = new CeWorker[workerCount];
+ for (int i = 0; i < workerCount; i++) {
+ workers[i] = mock(CeWorker.class);
+ when(workers[i].call())
+ .thenReturn(false)
+ .thenThrow(ERROR_TO_INTERRUPT_CHAINING);
+ }
+
ListenableScheduledFuture listenableScheduledFuture = mock(ListenableScheduledFuture.class);
CeProcessingSchedulerExecutorService processingExecutorService = mock(CeProcessingSchedulerExecutorService.class);
- CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(ceConfiguration, processingExecutorService, ceWorkerRunnable);
- when(processingExecutorService.schedule(ceWorkerRunnable, ceConfiguration.getQueuePollingDelay(), MILLISECONDS))
+ when(processingExecutorService.schedule(any(CeWorker.class), any(Long.class),any(TimeUnit.class))).thenReturn(listenableScheduledFuture);
+
+ CeWorkerFactory ceWorkerFactory = spy(new TestCeWorkerFactory(workers));
+ CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(ceConfiguration, processingExecutorService, ceWorkerFactory);
+ when(processingExecutorService.schedule(ceWorker, ceConfiguration.getQueuePollingDelay(), MILLISECONDS))
.thenReturn(listenableScheduledFuture);
underTest.startScheduling();
+ // No exception from TestCeWorkerFactory must be thrown
- verify(processingExecutorService, times(workerCount)).schedule(ceWorkerRunnable, ceConfiguration.getQueuePollingDelay(), MILLISECONDS);
+ // Verify that schedule has been called on all workers
+ for (int i = 0; i < workerCount; i++) {
+ verify(processingExecutorService).schedule(workers[i], ceConfiguration.getQueuePollingDelay(), MILLISECONDS);
+ }
verify(listenableScheduledFuture, times(workerCount)).addListener(any(Runnable.class), eq(processingExecutorService));
+ verify(ceWorkerFactory, times(workerCount)).create();
}
private void startSchedulingAndRun() throws ExecutionException, InterruptedException {
processingExecutorService.runFutures();
}
+ private class TestCeWorkerFactory implements CeWorkerFactory {
+ private final Iterator<CeWorker> ceWorkers;
+
+ private TestCeWorkerFactory(CeWorker... ceWorkers) {
+ this.ceWorkers = copyOf(ceWorkers).iterator();
+ }
+
+ @Override
+ public CeWorker create() {
+ // This will throw an NoSuchElementException if there are too many calls
+ return ceWorkers.next();
+ }
+
+ @Override
+ public Set<String> getWorkerUUIDs() {
+ return emptySet();
+ }
+ }
+
/**
* A synchronous implementation of {@link CeProcessingSchedulerExecutorService} which exposes a synchronous
* method to execute futures it creates and exposes a method to retrieve logs of calls to
command.run();
}
- // ///////// unsupported operations ///////////
+ /////////// unsupported operations ///////////
@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
'}';
}
}
-
}
+++ /dev/null
-/*
- * SonarQube
- * Copyright (C) 2009-2017 SonarSource SA
- * mailto:info AT sonarsource DOT com
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-package org.sonar.ce.taskprocessor;
-
-import java.util.List;
-import java.util.Optional;
-import javax.annotation.Nullable;
-import org.junit.Rule;
-import org.junit.Test;
-import org.mockito.ArgumentCaptor;
-import org.mockito.InOrder;
-import org.mockito.Mockito;
-import org.sonar.api.utils.log.LogTester;
-import org.sonar.api.utils.log.LoggerLevel;
-import org.sonar.ce.log.CeLogging;
-import org.sonar.ce.queue.CeTask;
-import org.sonar.ce.queue.InternalCeQueue;
-import org.sonar.db.ce.CeActivityDto;
-import org.sonar.db.ce.CeTaskTypes;
-import org.sonar.server.computation.task.projectanalysis.taskprocessor.ReportTaskProcessor;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyZeroInteractions;
-import static org.mockito.Mockito.when;
-
-public class CeWorkerCallableImplTest {
-
- @Rule
- public CeTaskProcessorRepositoryRule taskProcessorRepository = new CeTaskProcessorRepositoryRule();
- @Rule
- public LogTester logTester = new LogTester();
-
- private InternalCeQueue queue = mock(InternalCeQueue.class);
- private ReportTaskProcessor taskProcessor = mock(ReportTaskProcessor.class);
- private CeLogging ceLogging = spy(CeLogging.class);
- private ArgumentCaptor<String> workerUuid = ArgumentCaptor.forClass(String.class);
- private CeWorkerCallable underTest = new CeWorkerCallableImpl(queue, ceLogging, taskProcessorRepository);
- private InOrder inOrder = Mockito.inOrder(ceLogging, taskProcessor, queue);
-
- @Test
- public void no_pending_tasks_in_queue() throws Exception {
- when(queue.peek(anyString())).thenReturn(Optional.empty());
-
- assertThat(underTest.call()).isFalse();
-
- verifyZeroInteractions(taskProcessor, ceLogging);
- }
-
- @Test
- public void fail_when_no_CeTaskProcessor_is_found_in_repository() throws Exception {
- CeTask task = createCeTask(null);
- taskProcessorRepository.setNoProcessorForTask(CeTaskTypes.REPORT);
- when(queue.peek(anyString())).thenReturn(Optional.of(task));
-
- assertThat(underTest.call()).isTrue();
-
- verifyWorkerUuid();
- inOrder.verify(ceLogging).initForTask(task);
- inOrder.verify(queue).remove(task, CeActivityDto.Status.FAILED, null, null);
- inOrder.verify(ceLogging).clearForTask();
- }
-
- @Test
- public void peek_and_process_task() throws Exception {
- CeTask task = createCeTask(null);
- taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor);
- when(queue.peek(anyString())).thenReturn(Optional.of(task));
-
- assertThat(underTest.call()).isTrue();
-
- verifyWorkerUuid();
- inOrder.verify(ceLogging).initForTask(task);
- inOrder.verify(taskProcessor).process(task);
- inOrder.verify(queue).remove(task, CeActivityDto.Status.SUCCESS, null, null);
- inOrder.verify(ceLogging).clearForTask();
- }
-
- @Test
- public void fail_to_process_task() throws Exception {
- CeTask task = createCeTask(null);
- when(queue.peek(anyString())).thenReturn(Optional.of(task));
- taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor);
- Throwable error = makeTaskProcessorFail(task);
-
- assertThat(underTest.call()).isTrue();
-
- verifyWorkerUuid();
- inOrder.verify(ceLogging).initForTask(task);
- inOrder.verify(taskProcessor).process(task);
- inOrder.verify(queue).remove(task, CeActivityDto.Status.FAILED, null, error);
- inOrder.verify(ceLogging).clearForTask();
- }
-
- @Test
- public void do_not_display_submitter_param_in_log_when_submitterLogin_is_not_set_in_case_of_success() throws Exception {
- when(queue.peek(anyString())).thenReturn(Optional.of(createCeTask(null)));
- taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
-
- underTest.call();
-
- verifyWorkerUuid();
- List<String> logs = logTester.logs(LoggerLevel.INFO);
- assertThat(logs).hasSize(2);
- for (int i = 0; i < 2; i++) {
- assertThat(logs.get(i)).doesNotContain(" | submitter=");
- }
- }
-
- @Test
- public void do_not_display_submitter_param_in_log_when_submitterLogin_is_not_set_in_case_of_error() throws Exception {
- CeTask ceTask = createCeTask(null);
- when(queue.peek(anyString())).thenReturn(Optional.of(ceTask));
- taskProcessorRepository.setProcessorForTask(ceTask.getType(), taskProcessor);
- makeTaskProcessorFail(ceTask);
-
- underTest.call();
-
- verifyWorkerUuid();
- List<String> logs = logTester.logs(LoggerLevel.INFO);
- assertThat(logs).hasSize(1);
- assertThat(logs.get(0)).doesNotContain(" | submitter=");
- logs = logTester.logs(LoggerLevel.ERROR);
- assertThat(logs).hasSize(2);
- for (int i = 0; i < 2; i++) {
- assertThat(logs.get(i)).doesNotContain(" | submitter=");
- }
- assertThat(logTester.logs(LoggerLevel.DEBUG)).isEmpty();
- }
-
- @Test
- public void display_submitterLogin_in_logs_when_set_in_case_of_success() throws Exception {
- when(queue.peek(anyString())).thenReturn(Optional.of(createCeTask("FooBar")));
- taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
-
- underTest.call();
-
- verifyWorkerUuid();
- List<String> logs = logTester.logs(LoggerLevel.INFO);
- assertThat(logs).hasSize(2);
- assertThat(logs.get(0)).contains(" | submitter=FooBar");
- assertThat(logs.get(1)).contains(" | submitter=FooBar | time=");
- assertThat(logTester.logs(LoggerLevel.ERROR)).isEmpty();
- assertThat(logTester.logs(LoggerLevel.DEBUG)).isEmpty();
- }
-
- @Test
- public void display_submitterLogin_in_logs_when_set_in_case_of_error() throws Exception {
- CeTask ceTask = createCeTask("FooBar");
- when(queue.peek(anyString())).thenReturn(Optional.of(ceTask));
- taskProcessorRepository.setProcessorForTask(ceTask.getType(), taskProcessor);
- makeTaskProcessorFail(ceTask);
-
- underTest.call();
-
- verifyWorkerUuid();
- List<String> logs = logTester.logs(LoggerLevel.INFO);
- assertThat(logs).hasSize(1);
- assertThat(logs.iterator().next()).contains(" | submitter=FooBar");
- logs = logTester.logs(LoggerLevel.ERROR);
- assertThat(logs).hasSize(2);
- assertThat(logs.get(0)).isEqualTo("Failed to execute task " + ceTask.getUuid());
- assertThat(logs.get(1)).contains(" | submitter=FooBar | time=");
- }
-
- @Test
- public void display_start_stop_at_debug_level_for_console_if_DEBUG_is_enabled_and_task_successful() throws Exception {
- logTester.setLevel(LoggerLevel.DEBUG);
-
- when(queue.peek(anyString())).thenReturn(Optional.of(createCeTask("FooBar")));
- taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
-
- underTest.call();
-
- verifyWorkerUuid();
- List<String> logs = logTester.logs(LoggerLevel.INFO);
- assertThat(logs).hasSize(2);
- assertThat(logs.get(0)).contains(" | submitter=FooBar");
- assertThat(logs.get(1)).contains(" | submitter=FooBar | time=");
- assertThat(logTester.logs(LoggerLevel.ERROR)).isEmpty();
- assertThat(logTester.logs(LoggerLevel.DEBUG)).isEmpty();
- }
-
- @Test
- public void display_start_at_debug_level_stop_at_error_level_for_console_if_DEBUG_is_enabled_and_task_failed() throws Exception {
- logTester.setLevel(LoggerLevel.DEBUG);
-
- CeTask ceTask = createCeTask("FooBar");
- when(queue.peek(anyString())).thenReturn(Optional.of(ceTask));
- taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
- makeTaskProcessorFail(ceTask);
-
- underTest.call();
-
- verifyWorkerUuid();
- List<String> logs = logTester.logs(LoggerLevel.INFO);
- assertThat(logs).hasSize(1);
- assertThat(logs.iterator().next()).contains(" | submitter=FooBar");
- logs = logTester.logs(LoggerLevel.ERROR);
- assertThat(logs).hasSize(2);
- assertThat(logs.get(0)).isEqualTo("Failed to execute task " + ceTask.getUuid());
- assertThat(logs.get(1)).contains(" | submitter=FooBar | time=");
- assertThat(logTester.logs(LoggerLevel.DEBUG)).isEmpty();
- }
-
- private void verifyWorkerUuid() {
- verify(queue).peek(workerUuid.capture());
- assertThat(workerUuid.getValue()).startsWith("uuid");
- }
-
- private static CeTask createCeTask(@Nullable String submitterLogin) {
- return new CeTask.Builder()
- .setOrganizationUuid("org1")
- .setUuid("TASK_1").setType(CeTaskTypes.REPORT)
- .setComponentUuid("PROJECT_1")
- .setSubmitterLogin(submitterLogin)
- .build();
- }
-
- private IllegalStateException makeTaskProcessorFail(CeTask task) {
- IllegalStateException error = new IllegalStateException("simulate exception thrown by TaskProcessor#process");
- doThrow(error).when(taskProcessor).process(task);
- return error;
- }
-}
--- /dev/null
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package org.sonar.ce.taskprocessor;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.junit.Test;
+import org.sonar.ce.log.CeLogging;
+import org.sonar.ce.queue.InternalCeQueue;
+import org.sonar.core.util.UuidFactoryImpl;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+
+public class CeWorkerFactoryImplTest {
+ private CeWorkerFactoryImpl underTest = new CeWorkerFactoryImpl(mock(InternalCeQueue.class), mock(CeLogging.class),
+ mock(CeTaskProcessorRepository.class), UuidFactoryImpl.INSTANCE);
+
+ @Test
+ public void each_call_must_return_a_new_ceworker_with_unique_uuid() {
+ Set<CeWorker> ceWorkers = new HashSet<>();
+ Set<String> ceWorkerUUIDs = new HashSet<>();
+
+ for (int i = 0; i < 10; i++) {
+ CeWorker ceWorker = underTest.create();
+ ceWorkers.add(ceWorker);
+ ceWorkerUUIDs.add(ceWorker.getUUID());
+ }
+
+ assertThat(ceWorkers).hasSize(10);
+ assertThat(ceWorkerUUIDs).hasSize(10);
+ }
+
+ @Test
+ public void ceworker_created_by_factory_must_contain_uuid() {
+ CeWorker ceWorker = underTest.create();
+ assertThat(ceWorker.getUUID()).isNotEmpty();
+ }
+
+ @Test
+ public void CeWorkerFactory_has_an_empty_set_of_uuids_when_created() {
+ assertThat(underTest.getWorkerUUIDs()).isEmpty();
+ }
+
+ @Test
+ public void CeWorkerFactory_must_returns_the_uuids_of_worker() {
+ Set<String> ceWorkerUUIDs = new HashSet<>();
+
+ for (int i = 0; i < 10; i++) {
+ ceWorkerUUIDs.add(underTest.create().getUUID());
+ }
+
+ assertThat(underTest.getWorkerUUIDs()).isEqualTo(ceWorkerUUIDs);
+ }
+}
--- /dev/null
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.ce.taskprocessor;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.UUID;
+import javax.annotation.Nullable;
+import org.junit.Rule;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.InOrder;
+import org.mockito.Mockito;
+import org.sonar.api.utils.log.LogTester;
+import org.sonar.api.utils.log.LoggerLevel;
+import org.sonar.ce.log.CeLogging;
+import org.sonar.ce.queue.CeTask;
+import org.sonar.ce.queue.InternalCeQueue;
+import org.sonar.db.ce.CeActivityDto;
+import org.sonar.db.ce.CeTaskTypes;
+import org.sonar.server.computation.task.projectanalysis.taskprocessor.ReportTaskProcessor;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+
+public class CeWorkerImplTest {
+
+ @Rule
+ public CeTaskProcessorRepositoryRule taskProcessorRepository = new CeTaskProcessorRepositoryRule();
+ @Rule
+ public LogTester logTester = new LogTester();
+
+ private InternalCeQueue queue = mock(InternalCeQueue.class);
+ private ReportTaskProcessor taskProcessor = mock(ReportTaskProcessor.class);
+ private CeLogging ceLogging = spy(CeLogging.class);
+ private ArgumentCaptor<String> workerUuid = ArgumentCaptor.forClass(String.class);
+ private CeWorker underTest = new CeWorkerImpl(queue, ceLogging, taskProcessorRepository, UUID.randomUUID().toString());
+ private InOrder inOrder = Mockito.inOrder(ceLogging, taskProcessor, queue);
+
+ @Test
+ public void getUUID_must_return_the_uuid_of_constructor() {
+ String uuid = UUID.randomUUID().toString();
+ CeWorker underTest = new CeWorkerImpl(queue, ceLogging, taskProcessorRepository, uuid);
+ assertThat(underTest.getUUID()).isEqualTo(uuid);
+ }
+
+ @Test
+ public void no_pending_tasks_in_queue() throws Exception {
+ when(queue.peek(anyString())).thenReturn(Optional.empty());
+
+ assertThat(underTest.call()).isFalse();
+
+ verifyZeroInteractions(taskProcessor, ceLogging);
+ }
+
+ @Test
+ public void fail_when_no_CeTaskProcessor_is_found_in_repository() throws Exception {
+ CeTask task = createCeTask(null);
+ taskProcessorRepository.setNoProcessorForTask(CeTaskTypes.REPORT);
+ when(queue.peek(anyString())).thenReturn(Optional.of(task));
+
+ assertThat(underTest.call()).isTrue();
+
+ verifyWorkerUuid();
+ inOrder.verify(ceLogging).initForTask(task);
+ inOrder.verify(queue).remove(task, CeActivityDto.Status.FAILED, null, null);
+ inOrder.verify(ceLogging).clearForTask();
+ }
+
+ @Test
+ public void peek_and_process_task() throws Exception {
+ CeTask task = createCeTask(null);
+ taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor);
+ when(queue.peek(anyString())).thenReturn(Optional.of(task));
+
+ assertThat(underTest.call()).isTrue();
+
+ verifyWorkerUuid();
+ inOrder.verify(ceLogging).initForTask(task);
+ inOrder.verify(taskProcessor).process(task);
+ inOrder.verify(queue).remove(task, CeActivityDto.Status.SUCCESS, null, null);
+ inOrder.verify(ceLogging).clearForTask();
+ }
+
+ @Test
+ public void fail_to_process_task() throws Exception {
+ CeTask task = createCeTask(null);
+ when(queue.peek(anyString())).thenReturn(Optional.of(task));
+ taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor);
+ Throwable error = makeTaskProcessorFail(task);
+
+ assertThat(underTest.call()).isTrue();
+
+ verifyWorkerUuid();
+ inOrder.verify(ceLogging).initForTask(task);
+ inOrder.verify(taskProcessor).process(task);
+ inOrder.verify(queue).remove(task, CeActivityDto.Status.FAILED, null, error);
+ inOrder.verify(ceLogging).clearForTask();
+ }
+
+ @Test
+ public void do_not_display_submitter_param_in_log_when_submitterLogin_is_not_set_in_case_of_success() throws Exception {
+ when(queue.peek(anyString())).thenReturn(Optional.of(createCeTask(null)));
+ taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
+
+ underTest.call();
+
+ verifyWorkerUuid();
+ List<String> logs = logTester.logs(LoggerLevel.INFO);
+ assertThat(logs).hasSize(2);
+ for (int i = 0; i < 2; i++) {
+ assertThat(logs.get(i)).doesNotContain(" | submitter=");
+ }
+ }
+
+ @Test
+ public void do_not_display_submitter_param_in_log_when_submitterLogin_is_not_set_in_case_of_error() throws Exception {
+ CeTask ceTask = createCeTask(null);
+ when(queue.peek(anyString())).thenReturn(Optional.of(ceTask));
+ taskProcessorRepository.setProcessorForTask(ceTask.getType(), taskProcessor);
+ makeTaskProcessorFail(ceTask);
+
+ underTest.call();
+
+ verifyWorkerUuid();
+ List<String> logs = logTester.logs(LoggerLevel.INFO);
+ assertThat(logs).hasSize(1);
+ assertThat(logs.get(0)).doesNotContain(" | submitter=");
+ logs = logTester.logs(LoggerLevel.ERROR);
+ assertThat(logs).hasSize(2);
+ for (int i = 0; i < 2; i++) {
+ assertThat(logs.get(i)).doesNotContain(" | submitter=");
+ }
+ assertThat(logTester.logs(LoggerLevel.DEBUG)).isEmpty();
+ }
+
+ @Test
+ public void display_submitterLogin_in_logs_when_set_in_case_of_success() throws Exception {
+ when(queue.peek(anyString())).thenReturn(Optional.of(createCeTask("FooBar")));
+ taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
+
+ underTest.call();
+
+ verifyWorkerUuid();
+ List<String> logs = logTester.logs(LoggerLevel.INFO);
+ assertThat(logs).hasSize(2);
+ assertThat(logs.get(0)).contains(" | submitter=FooBar");
+ assertThat(logs.get(1)).contains(" | submitter=FooBar | time=");
+ assertThat(logTester.logs(LoggerLevel.ERROR)).isEmpty();
+ assertThat(logTester.logs(LoggerLevel.DEBUG)).isEmpty();
+ }
+
+ @Test
+ public void display_submitterLogin_in_logs_when_set_in_case_of_error() throws Exception {
+ CeTask ceTask = createCeTask("FooBar");
+ when(queue.peek(anyString())).thenReturn(Optional.of(ceTask));
+ taskProcessorRepository.setProcessorForTask(ceTask.getType(), taskProcessor);
+ makeTaskProcessorFail(ceTask);
+
+ underTest.call();
+
+ verifyWorkerUuid();
+ List<String> logs = logTester.logs(LoggerLevel.INFO);
+ assertThat(logs).hasSize(1);
+ assertThat(logs.iterator().next()).contains(" | submitter=FooBar");
+ logs = logTester.logs(LoggerLevel.ERROR);
+ assertThat(logs).hasSize(2);
+ assertThat(logs.get(0)).isEqualTo("Failed to execute task " + ceTask.getUuid());
+ assertThat(logs.get(1)).contains(" | submitter=FooBar | time=");
+ }
+
+ @Test
+ public void display_start_stop_at_debug_level_for_console_if_DEBUG_is_enabled_and_task_successful() throws Exception {
+ logTester.setLevel(LoggerLevel.DEBUG);
+
+ when(queue.peek(anyString())).thenReturn(Optional.of(createCeTask("FooBar")));
+ taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
+
+ underTest.call();
+
+ verifyWorkerUuid();
+ List<String> logs = logTester.logs(LoggerLevel.INFO);
+ assertThat(logs).hasSize(2);
+ assertThat(logs.get(0)).contains(" | submitter=FooBar");
+ assertThat(logs.get(1)).contains(" | submitter=FooBar | time=");
+ assertThat(logTester.logs(LoggerLevel.ERROR)).isEmpty();
+ assertThat(logTester.logs(LoggerLevel.DEBUG)).isEmpty();
+ }
+
+ @Test
+ public void display_start_at_debug_level_stop_at_error_level_for_console_if_DEBUG_is_enabled_and_task_failed() throws Exception {
+ logTester.setLevel(LoggerLevel.DEBUG);
+
+ CeTask ceTask = createCeTask("FooBar");
+ when(queue.peek(anyString())).thenReturn(Optional.of(ceTask));
+ taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
+ makeTaskProcessorFail(ceTask);
+
+ underTest.call();
+
+ verifyWorkerUuid();
+ List<String> logs = logTester.logs(LoggerLevel.INFO);
+ assertThat(logs).hasSize(1);
+ assertThat(logs.iterator().next()).contains(" | submitter=FooBar");
+ logs = logTester.logs(LoggerLevel.ERROR);
+ assertThat(logs).hasSize(2);
+ assertThat(logs.get(0)).isEqualTo("Failed to execute task " + ceTask.getUuid());
+ assertThat(logs.get(1)).contains(" | submitter=FooBar | time=");
+ assertThat(logTester.logs(LoggerLevel.DEBUG)).isEmpty();
+ }
+
+ private void verifyWorkerUuid() {
+ verify(queue).peek(workerUuid.capture());
+ assertThat(workerUuid.getValue()).startsWith(workerUuid.getValue());
+ }
+
+ private static CeTask createCeTask(@Nullable String submitterLogin) {
+ return new CeTask.Builder()
+ .setOrganizationUuid("org1")
+ .setUuid("TASK_1").setType(CeTaskTypes.REPORT)
+ .setComponentUuid("PROJECT_1")
+ .setSubmitterLogin(submitterLogin)
+ .build();
+ }
+
+ private IllegalStateException makeTaskProcessorFail(CeTask task) {
+ IllegalStateException error = new IllegalStateException("simulate exception thrown by TaskProcessor#process");
+ doThrow(error).when(taskProcessor).process(task);
+ return error;
+ }
+}