From: Sébastien Lesaint Date: Mon, 7 Dec 2015 09:10:45 +0000 (+0100) Subject: SONAR-7088 rename CeWorkerRunnable to CeWorkerCallable X-Git-Tag: 5.3-RC1~47 X-Git-Url: https://source.dussan.org/?a=commitdiff_plain;h=refs%2Fpull%2F682%2Fhead;p=sonarqube.git SONAR-7088 rename CeWorkerRunnable to CeWorkerCallable --- diff --git a/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeProcessingSchedulerExecutorService.java b/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeProcessingSchedulerExecutorService.java index 33d02bf48aa..76d062998b4 100644 --- a/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeProcessingSchedulerExecutorService.java +++ b/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeProcessingSchedulerExecutorService.java @@ -23,7 +23,7 @@ import com.google.common.util.concurrent.ListeningScheduledExecutorService; import org.sonar.server.util.StoppableExecutorService; /** - * The {@link java.util.concurrent.ExecutorService} responsible for running {@link CeWorkerRunnableImpl}. + * The {@link java.util.concurrent.ExecutorService} responsible for running {@link CeWorkerCallableImpl}. */ public interface CeProcessingSchedulerExecutorService extends StoppableExecutorService, ListeningScheduledExecutorService { } diff --git a/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeProcessingSchedulerImpl.java b/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeProcessingSchedulerImpl.java index a5040317030..aa034173e8d 100644 --- a/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeProcessingSchedulerImpl.java +++ b/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeProcessingSchedulerImpl.java @@ -38,14 +38,14 @@ public class CeProcessingSchedulerImpl implements CeProcessingScheduler, Startab private static final Logger LOG = Loggers.get(CeProcessingSchedulerImpl.class); private final CeProcessingSchedulerExecutorService executorService; - private final CeWorkerRunnable workerRunnable; + private final CeWorkerCallable workerRunnable; private final long delayBetweenTasks; private final TimeUnit timeUnit; // warning: using a single ChainingCallback object for chaining works and is thread safe only because we use a single Thread in CeProcessingSchedulerExecutorService private final ChainingCallback chainingCallback = new ChainingCallback(); - public CeProcessingSchedulerImpl(CeProcessingSchedulerExecutorService processingExecutorService, CeWorkerRunnable workerRunnable) { + public CeProcessingSchedulerImpl(CeProcessingSchedulerExecutorService processingExecutorService, CeWorkerCallable workerRunnable) { this.executorService = processingExecutorService; this.workerRunnable = workerRunnable; diff --git a/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeTaskProcessorModule.java b/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeTaskProcessorModule.java index 8ae68bd4789..77fd53627e5 100644 --- a/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeTaskProcessorModule.java +++ b/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeTaskProcessorModule.java @@ -26,7 +26,7 @@ public class CeTaskProcessorModule extends Module { protected void configureModule() { add( CeTaskProcessorRepositoryImpl.class, - CeWorkerRunnableImpl.class, + CeWorkerCallableImpl.class, CeProcessingSchedulerExecutorServiceImpl.class, CeProcessingSchedulerImpl.class); } diff --git a/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeWorkerCallable.java b/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeWorkerCallable.java new file mode 100644 index 00000000000..6c75a5e4070 --- /dev/null +++ b/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeWorkerCallable.java @@ -0,0 +1,30 @@ +/* + * SonarQube, open source software quality management tool. + * Copyright (C) 2008-2014 SonarSource + * mailto:contact AT sonarsource DOT com + * + * SonarQube is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * SonarQube is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.server.computation.taskprocessor; + +import java.util.concurrent.Callable; +import org.sonar.server.computation.queue.CeQueue; +import org.sonar.server.computation.queue.CeTask; + +/** + * Marker interface of the runnable in charge of polling the {@link CeQueue} and executing {@link CeTask}. + */ +public interface CeWorkerCallable extends Callable { +} diff --git a/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeWorkerCallableImpl.java b/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeWorkerCallableImpl.java new file mode 100644 index 00000000000..091bb7cdc48 --- /dev/null +++ b/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeWorkerCallableImpl.java @@ -0,0 +1,108 @@ +/* + * SonarQube, open source software quality management tool. + * Copyright (C) 2008-2014 SonarSource + * mailto:contact AT sonarsource DOT com + * + * SonarQube is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * SonarQube is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package org.sonar.server.computation.taskprocessor; + +import com.google.common.base.Optional; +import org.sonar.api.utils.log.Logger; +import org.sonar.api.utils.log.Loggers; +import org.sonar.core.util.logs.Profiler; +import org.sonar.db.ce.CeActivityDto; +import org.sonar.server.computation.log.CeLogging; +import org.sonar.server.computation.queue.CeQueue; +import org.sonar.server.computation.queue.CeTask; + +import static java.lang.String.format; + +public class CeWorkerCallableImpl implements CeWorkerCallable { + + private static final Logger LOG = Loggers.get(CeWorkerCallableImpl.class); + + private final CeQueue queue; + private final CeLogging ceLogging; + private final CeTaskProcessorRepository taskProcessorRepository; + + public CeWorkerCallableImpl(CeQueue queue, CeLogging ceLogging, CeTaskProcessorRepository taskProcessorRepository) { + this.queue = queue; + this.ceLogging = ceLogging; + this.taskProcessorRepository = taskProcessorRepository; + } + + @Override + public Boolean call() throws Exception { + Optional ceTask = tryAndFindTaskToExecute(); + if (!ceTask.isPresent()) { + return false; + } + + executeTask(ceTask.get()); + return true; + } + + private Optional tryAndFindTaskToExecute() { + try { + return queue.peek(); + } catch (Exception e) { + LOG.error("Failed to pop the queue of analysis reports", e); + } + return Optional.absent(); + } + + private void executeTask(CeTask task) { + // logging twice: once in sonar.log and once in CE appender + Profiler regularProfiler = startProfiler(task); + ceLogging.initForTask(task); + Profiler ceProfiler = startProfiler(task); + + CeActivityDto.Status status = CeActivityDto.Status.FAILED; + try { + // TODO delegate the message to the related task processor, according to task type + Optional taskProcessor = taskProcessorRepository.getForCeTask(task); + if (taskProcessor.isPresent()) { + taskProcessor.get().process(task); + status = CeActivityDto.Status.SUCCESS; + } else { + LOG.error("No CeTaskProcessor is defined for task of type {}. Plugin configuration may have changed", task.getType()); + status = CeActivityDto.Status.FAILED; + } + queue.remove(task, status); + } catch (Throwable e) { + LOG.error(format("Failed to execute task %s", task.getUuid()), e); + queue.remove(task, status); + } finally { + // logging twice: once in sonar.log and once in CE appender + stopProfiler(ceProfiler, task, status); + ceLogging.clearForTask(); + stopProfiler(regularProfiler, task, status); + } + } + + private static Profiler startProfiler(CeTask task) { + return Profiler.create(LOG).startInfo("Execute task | project={} | id={}", task.getComponentKey(), task.getUuid()); + } + + private static void stopProfiler(Profiler profiler, CeTask task, CeActivityDto.Status status) { + if (status == CeActivityDto.Status.FAILED) { + profiler.stopError("Executed task | project={} | id={}", task.getComponentKey(), task.getUuid()); + } else { + profiler.stopInfo("Executed task | project={} | id={}", task.getComponentKey(), task.getUuid()); + } + } +} diff --git a/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeWorkerRunnable.java b/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeWorkerRunnable.java deleted file mode 100644 index d6c8fb86b89..00000000000 --- a/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeWorkerRunnable.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * SonarQube, open source software quality management tool. - * Copyright (C) 2008-2014 SonarSource - * mailto:contact AT sonarsource DOT com - * - * SonarQube is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License as published by the Free Software Foundation; either - * version 3 of the License, or (at your option) any later version. - * - * SonarQube is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public License - * along with this program; if not, write to the Free Software Foundation, - * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ -package org.sonar.server.computation.taskprocessor; - -import java.util.concurrent.Callable; -import org.sonar.server.computation.queue.CeQueue; -import org.sonar.server.computation.queue.CeTask; - -/** - * Marker interface of the runnable in charge of polling the {@link CeQueue} and executing {@link CeTask}. - */ -public interface CeWorkerRunnable extends Callable { -} diff --git a/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeWorkerRunnableImpl.java b/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeWorkerRunnableImpl.java deleted file mode 100644 index a7b28ee9367..00000000000 --- a/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeWorkerRunnableImpl.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * SonarQube, open source software quality management tool. - * Copyright (C) 2008-2014 SonarSource - * mailto:contact AT sonarsource DOT com - * - * SonarQube is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License as published by the Free Software Foundation; either - * version 3 of the License, or (at your option) any later version. - * - * SonarQube is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public License - * along with this program; if not, write to the Free Software Foundation, - * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ - -package org.sonar.server.computation.taskprocessor; - -import com.google.common.base.Optional; -import org.sonar.api.utils.log.Logger; -import org.sonar.api.utils.log.Loggers; -import org.sonar.core.util.logs.Profiler; -import org.sonar.db.ce.CeActivityDto; -import org.sonar.server.computation.log.CeLogging; -import org.sonar.server.computation.queue.CeQueue; -import org.sonar.server.computation.queue.CeTask; - -import static java.lang.String.format; - -public class CeWorkerRunnableImpl implements CeWorkerRunnable { - - private static final Logger LOG = Loggers.get(CeWorkerRunnableImpl.class); - - private final CeQueue queue; - private final CeLogging ceLogging; - private final CeTaskProcessorRepository taskProcessorRepository; - - public CeWorkerRunnableImpl(CeQueue queue, CeLogging ceLogging, CeTaskProcessorRepository taskProcessorRepository) { - this.queue = queue; - this.ceLogging = ceLogging; - this.taskProcessorRepository = taskProcessorRepository; - } - - @Override - public Boolean call() throws Exception { - Optional ceTask = tryAndFindTaskToExecute(); - if (!ceTask.isPresent()) { - return false; - } - - executeTask(ceTask.get()); - return true; - } - - private Optional tryAndFindTaskToExecute() { - try { - return queue.peek(); - } catch (Exception e) { - LOG.error("Failed to pop the queue of analysis reports", e); - } - return Optional.absent(); - } - - private void executeTask(CeTask task) { - // logging twice: once in sonar.log and once in CE appender - Profiler regularProfiler = startProfiler(task); - ceLogging.initForTask(task); - Profiler ceProfiler = startProfiler(task); - - CeActivityDto.Status status = CeActivityDto.Status.FAILED; - try { - // TODO delegate the message to the related task processor, according to task type - Optional taskProcessor = taskProcessorRepository.getForCeTask(task); - if (taskProcessor.isPresent()) { - taskProcessor.get().process(task); - status = CeActivityDto.Status.SUCCESS; - } else { - LOG.error("No CeTaskProcessor is defined for task of type {}. Plugin configuration may have changed", task.getType()); - status = CeActivityDto.Status.FAILED; - } - queue.remove(task, status); - } catch (Throwable e) { - LOG.error(format("Failed to execute task %s", task.getUuid()), e); - queue.remove(task, status); - } finally { - // logging twice: once in sonar.log and once in CE appender - stopProfiler(ceProfiler, task, status); - ceLogging.clearForTask(); - stopProfiler(regularProfiler, task, status); - } - } - - private static Profiler startProfiler(CeTask task) { - return Profiler.create(LOG).startInfo("Execute task | project={} | id={}", task.getComponentKey(), task.getUuid()); - } - - private static void stopProfiler(Profiler profiler, CeTask task, CeActivityDto.Status status) { - if (status == CeActivityDto.Status.FAILED) { - profiler.stopError("Executed task | project={} | id={}", task.getComponentKey(), task.getUuid()); - } else { - profiler.stopInfo("Executed task | project={} | id={}", task.getComponentKey(), task.getUuid()); - } - } -} diff --git a/server/sonar-server/src/test/java/org/sonar/server/computation/taskprocessor/CeProcessingSchedulerImplTest.java b/server/sonar-server/src/test/java/org/sonar/server/computation/taskprocessor/CeProcessingSchedulerImplTest.java index 82b412fd43f..d2c33c422a0 100644 --- a/server/sonar-server/src/test/java/org/sonar/server/computation/taskprocessor/CeProcessingSchedulerImplTest.java +++ b/server/sonar-server/src/test/java/org/sonar/server/computation/taskprocessor/CeProcessingSchedulerImplTest.java @@ -54,7 +54,7 @@ public class CeProcessingSchedulerImplTest { // due to risks of infinite chaining of tasks/futures, a timeout is required for safety public Timeout timeout = Timeout.seconds(60); - private CeWorkerRunnable ceWorkerRunnable = mock(CeWorkerRunnable.class); + private CeWorkerCallable ceWorkerRunnable = mock(CeWorkerCallable.class); private StubCeProcessingSchedulerExecutorService processingExecutorService = new StubCeProcessingSchedulerExecutorService(); private SchedulerCall regularDelayedPoll = new SchedulerCall(ceWorkerRunnable, 2L, TimeUnit.SECONDS); private SchedulerCall notDelayedPoll = new SchedulerCall(ceWorkerRunnable); @@ -62,7 +62,7 @@ public class CeProcessingSchedulerImplTest { private CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(processingExecutorService, ceWorkerRunnable); @Test - public void polls_without_delay_when_CeWorkerRunnable_returns_true() throws Exception { + public void polls_without_delay_when_CeWorkerCallable_returns_true() throws Exception { when(ceWorkerRunnable.call()) .thenReturn(true) .thenThrow(ERROR_TO_INTERRUPT_CHAINING); @@ -76,7 +76,7 @@ public class CeProcessingSchedulerImplTest { } @Test - public void polls_without_delay_when_CeWorkerRunnable_throws_Exception_but_not_Error() throws Exception { + public void polls_without_delay_when_CeWorkerCallable_throws_Exception_but_not_Error() throws Exception { when(ceWorkerRunnable.call()) .thenThrow(new Exception("Exception is followed by a poll without delay")) .thenThrow(ERROR_TO_INTERRUPT_CHAINING); @@ -90,7 +90,7 @@ public class CeProcessingSchedulerImplTest { } @Test - public void polls_with_regular_delay_when_CeWorkerRunnable_returns_false() throws Exception { + public void polls_with_regular_delay_when_CeWorkerCallable_returns_false() throws Exception { when(ceWorkerRunnable.call()) .thenReturn(false) .thenThrow(ERROR_TO_INTERRUPT_CHAINING); @@ -104,7 +104,7 @@ public class CeProcessingSchedulerImplTest { } @Test - public void startScheduling_schedules_CeWorkerRunnable_at_fixed_rate_run_head_of_queue() throws Exception { + public void startScheduling_schedules_CeWorkerCallable_at_fixed_rate_run_head_of_queue() throws Exception { when(ceWorkerRunnable.call()) .thenReturn(true) .thenReturn(true) diff --git a/server/sonar-server/src/test/java/org/sonar/server/computation/taskprocessor/CeWorkerCallableImplTest.java b/server/sonar-server/src/test/java/org/sonar/server/computation/taskprocessor/CeWorkerCallableImplTest.java new file mode 100644 index 00000000000..331ceaeac14 --- /dev/null +++ b/server/sonar-server/src/test/java/org/sonar/server/computation/taskprocessor/CeWorkerCallableImplTest.java @@ -0,0 +1,102 @@ +/* + * SonarQube, open source software quality management tool. + * Copyright (C) 2008-2014 SonarSource + * mailto:contact AT sonarsource DOT com + * + * SonarQube is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * SonarQube is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.server.computation.taskprocessor; + +import com.google.common.base.Optional; +import org.junit.Rule; +import org.junit.Test; +import org.mockito.InOrder; +import org.mockito.Mockito; +import org.sonar.db.ce.CeActivityDto; +import org.sonar.db.ce.CeTaskTypes; +import org.sonar.server.computation.log.CeLogging; +import org.sonar.server.computation.queue.CeQueue; +import org.sonar.server.computation.queue.CeQueueImpl; +import org.sonar.server.computation.queue.CeTask; +import org.sonar.server.computation.taskprocessor.report.ReportTaskProcessor; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.when; + +public class CeWorkerCallableImplTest { + + @Rule + public CeTaskProcessorRepositoryRule taskProcessorRepository = new CeTaskProcessorRepositoryRule(); + + CeQueue queue = mock(CeQueueImpl.class); + ReportTaskProcessor taskProcessor = mock(ReportTaskProcessor.class); + CeLogging ceLogging = mock(CeLogging.class); + CeWorkerCallable underTest = new CeWorkerCallableImpl(queue, ceLogging, taskProcessorRepository); + InOrder inOrder = Mockito.inOrder(ceLogging, taskProcessor, queue); + + @Test + public void no_pending_tasks_in_queue() throws Exception { + when(queue.peek()).thenReturn(Optional.absent()); + + assertThat(underTest.call()).isFalse(); + + verifyZeroInteractions(taskProcessor, ceLogging); + } + + @Test + public void fail_when_no_CeTaskProcessor_is_found_in_repository() throws Exception { + CeTask task = new CeTask.Builder().setUuid("TASK_1").setType(CeTaskTypes.REPORT).setComponentUuid("PROJECT_1").setSubmitterLogin(null).build(); + taskProcessorRepository.setNoProcessorForTask(CeTaskTypes.REPORT); + when(queue.peek()).thenReturn(Optional.of(task)); + + assertThat(underTest.call()).isTrue(); + + inOrder.verify(ceLogging).initForTask(task); + inOrder.verify(queue).remove(task, CeActivityDto.Status.FAILED); + inOrder.verify(ceLogging).clearForTask(); + } + + @Test + public void peek_and_process_task() throws Exception { + CeTask task = new CeTask.Builder().setUuid("TASK_1").setType(CeTaskTypes.REPORT).setComponentUuid("PROJECT_1").setSubmitterLogin(null).build(); + taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor); + when(queue.peek()).thenReturn(Optional.of(task)); + + assertThat(underTest.call()).isTrue(); + + inOrder.verify(ceLogging).initForTask(task); + inOrder.verify(taskProcessor).process(task); + inOrder.verify(queue).remove(task, CeActivityDto.Status.SUCCESS); + inOrder.verify(ceLogging).clearForTask(); + } + + @Test + public void fail_to_process_task() throws Exception { + CeTask task = new CeTask.Builder().setUuid("TASK_1").setType(CeTaskTypes.REPORT).setComponentUuid("PROJECT_1").setSubmitterLogin(null).build(); + when(queue.peek()).thenReturn(Optional.of(task)); + taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor); + doThrow(new IllegalStateException("simulate exception thrown by TaskProcessor#process")).when(taskProcessor).process(task); + + assertThat(underTest.call()).isTrue(); + + inOrder.verify(ceLogging).initForTask(task); + inOrder.verify(taskProcessor).process(task); + inOrder.verify(queue).remove(task, CeActivityDto.Status.FAILED); + inOrder.verify(ceLogging).clearForTask(); + } +} diff --git a/server/sonar-server/src/test/java/org/sonar/server/computation/taskprocessor/CeWorkerRunnableImplTest.java b/server/sonar-server/src/test/java/org/sonar/server/computation/taskprocessor/CeWorkerRunnableImplTest.java deleted file mode 100644 index 305c5e321d4..00000000000 --- a/server/sonar-server/src/test/java/org/sonar/server/computation/taskprocessor/CeWorkerRunnableImplTest.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * SonarQube, open source software quality management tool. - * Copyright (C) 2008-2014 SonarSource - * mailto:contact AT sonarsource DOT com - * - * SonarQube is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License as published by the Free Software Foundation; either - * version 3 of the License, or (at your option) any later version. - * - * SonarQube is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public License - * along with this program; if not, write to the Free Software Foundation, - * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ -package org.sonar.server.computation.taskprocessor; - -import com.google.common.base.Optional; -import org.junit.Rule; -import org.junit.Test; -import org.mockito.InOrder; -import org.mockito.Mockito; -import org.sonar.db.ce.CeActivityDto; -import org.sonar.db.ce.CeTaskTypes; -import org.sonar.server.computation.log.CeLogging; -import org.sonar.server.computation.queue.CeQueue; -import org.sonar.server.computation.queue.CeQueueImpl; -import org.sonar.server.computation.queue.CeTask; -import org.sonar.server.computation.taskprocessor.report.ReportTaskProcessor; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verifyZeroInteractions; -import static org.mockito.Mockito.when; - -public class CeWorkerRunnableImplTest { - - @Rule - public CeTaskProcessorRepositoryRule taskProcessorRepository = new CeTaskProcessorRepositoryRule(); - - CeQueue queue = mock(CeQueueImpl.class); - ReportTaskProcessor taskProcessor = mock(ReportTaskProcessor.class); - CeLogging ceLogging = mock(CeLogging.class); - CeWorkerRunnable underTest = new CeWorkerRunnableImpl(queue, ceLogging, taskProcessorRepository); - InOrder inOrder = Mockito.inOrder(ceLogging, taskProcessor, queue); - - @Test - public void no_pending_tasks_in_queue() throws Exception { - when(queue.peek()).thenReturn(Optional.absent()); - - assertThat(underTest.call()).isFalse(); - - verifyZeroInteractions(taskProcessor, ceLogging); - } - - @Test - public void fail_when_no_CeTaskProcessor_is_found_in_repository() throws Exception { - CeTask task = new CeTask.Builder().setUuid("TASK_1").setType(CeTaskTypes.REPORT).setComponentUuid("PROJECT_1").setSubmitterLogin(null).build(); - taskProcessorRepository.setNoProcessorForTask(CeTaskTypes.REPORT); - when(queue.peek()).thenReturn(Optional.of(task)); - - assertThat(underTest.call()).isTrue(); - - inOrder.verify(ceLogging).initForTask(task); - inOrder.verify(queue).remove(task, CeActivityDto.Status.FAILED); - inOrder.verify(ceLogging).clearForTask(); - } - - @Test - public void peek_and_process_task() throws Exception { - CeTask task = new CeTask.Builder().setUuid("TASK_1").setType(CeTaskTypes.REPORT).setComponentUuid("PROJECT_1").setSubmitterLogin(null).build(); - taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor); - when(queue.peek()).thenReturn(Optional.of(task)); - - assertThat(underTest.call()).isTrue(); - - inOrder.verify(ceLogging).initForTask(task); - inOrder.verify(taskProcessor).process(task); - inOrder.verify(queue).remove(task, CeActivityDto.Status.SUCCESS); - inOrder.verify(ceLogging).clearForTask(); - } - - @Test - public void fail_to_process_task() throws Exception { - CeTask task = new CeTask.Builder().setUuid("TASK_1").setType(CeTaskTypes.REPORT).setComponentUuid("PROJECT_1").setSubmitterLogin(null).build(); - when(queue.peek()).thenReturn(Optional.of(task)); - taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor); - doThrow(new IllegalStateException("simulate exception thrown by TaskProcessor#process")).when(taskProcessor).process(task); - - assertThat(underTest.call()).isTrue(); - - inOrder.verify(ceLogging).initForTask(task); - inOrder.verify(taskProcessor).process(task); - inOrder.verify(queue).remove(task, CeActivityDto.Status.FAILED); - inOrder.verify(ceLogging).clearForTask(); - } -}