import org.sonar.server.util.StoppableExecutorService;
/**
- * The {@link java.util.concurrent.ExecutorService} responsible for running {@link CeWorkerRunnableImpl}.
+ * The {@link java.util.concurrent.ExecutorService} responsible for running {@link CeWorkerCallableImpl}.
*/
public interface CeProcessingSchedulerExecutorService extends StoppableExecutorService, ListeningScheduledExecutorService {
}
private static final Logger LOG = Loggers.get(CeProcessingSchedulerImpl.class);
private final CeProcessingSchedulerExecutorService executorService;
- private final CeWorkerRunnable workerRunnable;
+ private final CeWorkerCallable workerRunnable;
private final long delayBetweenTasks;
private final TimeUnit timeUnit;
// warning: using a single ChainingCallback object for chaining works and is thread safe only because we use a single Thread in CeProcessingSchedulerExecutorService
private final ChainingCallback chainingCallback = new ChainingCallback();
- public CeProcessingSchedulerImpl(CeProcessingSchedulerExecutorService processingExecutorService, CeWorkerRunnable workerRunnable) {
+ public CeProcessingSchedulerImpl(CeProcessingSchedulerExecutorService processingExecutorService, CeWorkerCallable workerRunnable) {
this.executorService = processingExecutorService;
this.workerRunnable = workerRunnable;
protected void configureModule() {
add(
CeTaskProcessorRepositoryImpl.class,
- CeWorkerRunnableImpl.class,
+ CeWorkerCallableImpl.class,
CeProcessingSchedulerExecutorServiceImpl.class,
CeProcessingSchedulerImpl.class);
}
--- /dev/null
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.server.computation.taskprocessor;
+
+import java.util.concurrent.Callable;
+import org.sonar.server.computation.queue.CeQueue;
+import org.sonar.server.computation.queue.CeTask;
+
+/**
+ * Marker interface of the runnable in charge of polling the {@link CeQueue} and executing {@link CeTask}.
+ */
+public interface CeWorkerCallable extends Callable<Boolean> {
+}
--- /dev/null
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package org.sonar.server.computation.taskprocessor;
+
+import com.google.common.base.Optional;
+import org.sonar.api.utils.log.Logger;
+import org.sonar.api.utils.log.Loggers;
+import org.sonar.core.util.logs.Profiler;
+import org.sonar.db.ce.CeActivityDto;
+import org.sonar.server.computation.log.CeLogging;
+import org.sonar.server.computation.queue.CeQueue;
+import org.sonar.server.computation.queue.CeTask;
+
+import static java.lang.String.format;
+
+public class CeWorkerCallableImpl implements CeWorkerCallable {
+
+ private static final Logger LOG = Loggers.get(CeWorkerCallableImpl.class);
+
+ private final CeQueue queue;
+ private final CeLogging ceLogging;
+ private final CeTaskProcessorRepository taskProcessorRepository;
+
+ public CeWorkerCallableImpl(CeQueue queue, CeLogging ceLogging, CeTaskProcessorRepository taskProcessorRepository) {
+ this.queue = queue;
+ this.ceLogging = ceLogging;
+ this.taskProcessorRepository = taskProcessorRepository;
+ }
+
+ @Override
+ public Boolean call() throws Exception {
+ Optional<CeTask> ceTask = tryAndFindTaskToExecute();
+ if (!ceTask.isPresent()) {
+ return false;
+ }
+
+ executeTask(ceTask.get());
+ return true;
+ }
+
+ private Optional<CeTask> tryAndFindTaskToExecute() {
+ try {
+ return queue.peek();
+ } catch (Exception e) {
+ LOG.error("Failed to pop the queue of analysis reports", e);
+ }
+ return Optional.absent();
+ }
+
+ private void executeTask(CeTask task) {
+ // logging twice: once in sonar.log and once in CE appender
+ Profiler regularProfiler = startProfiler(task);
+ ceLogging.initForTask(task);
+ Profiler ceProfiler = startProfiler(task);
+
+ CeActivityDto.Status status = CeActivityDto.Status.FAILED;
+ try {
+ // TODO delegate the message to the related task processor, according to task type
+ Optional<CeTaskProcessor> taskProcessor = taskProcessorRepository.getForCeTask(task);
+ if (taskProcessor.isPresent()) {
+ taskProcessor.get().process(task);
+ status = CeActivityDto.Status.SUCCESS;
+ } else {
+ LOG.error("No CeTaskProcessor is defined for task of type {}. Plugin configuration may have changed", task.getType());
+ status = CeActivityDto.Status.FAILED;
+ }
+ queue.remove(task, status);
+ } catch (Throwable e) {
+ LOG.error(format("Failed to execute task %s", task.getUuid()), e);
+ queue.remove(task, status);
+ } finally {
+ // logging twice: once in sonar.log and once in CE appender
+ stopProfiler(ceProfiler, task, status);
+ ceLogging.clearForTask();
+ stopProfiler(regularProfiler, task, status);
+ }
+ }
+
+ private static Profiler startProfiler(CeTask task) {
+ return Profiler.create(LOG).startInfo("Execute task | project={} | id={}", task.getComponentKey(), task.getUuid());
+ }
+
+ private static void stopProfiler(Profiler profiler, CeTask task, CeActivityDto.Status status) {
+ if (status == CeActivityDto.Status.FAILED) {
+ profiler.stopError("Executed task | project={} | id={}", task.getComponentKey(), task.getUuid());
+ } else {
+ profiler.stopInfo("Executed task | project={} | id={}", task.getComponentKey(), task.getUuid());
+ }
+ }
+}
+++ /dev/null
-/*
- * SonarQube, open source software quality management tool.
- * Copyright (C) 2008-2014 SonarSource
- * mailto:contact AT sonarsource DOT com
- *
- * SonarQube is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * SonarQube is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-package org.sonar.server.computation.taskprocessor;
-
-import java.util.concurrent.Callable;
-import org.sonar.server.computation.queue.CeQueue;
-import org.sonar.server.computation.queue.CeTask;
-
-/**
- * Marker interface of the runnable in charge of polling the {@link CeQueue} and executing {@link CeTask}.
- */
-public interface CeWorkerRunnable extends Callable<Boolean> {
-}
+++ /dev/null
-/*
- * SonarQube, open source software quality management tool.
- * Copyright (C) 2008-2014 SonarSource
- * mailto:contact AT sonarsource DOT com
- *
- * SonarQube is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * SonarQube is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-
-package org.sonar.server.computation.taskprocessor;
-
-import com.google.common.base.Optional;
-import org.sonar.api.utils.log.Logger;
-import org.sonar.api.utils.log.Loggers;
-import org.sonar.core.util.logs.Profiler;
-import org.sonar.db.ce.CeActivityDto;
-import org.sonar.server.computation.log.CeLogging;
-import org.sonar.server.computation.queue.CeQueue;
-import org.sonar.server.computation.queue.CeTask;
-
-import static java.lang.String.format;
-
-public class CeWorkerRunnableImpl implements CeWorkerRunnable {
-
- private static final Logger LOG = Loggers.get(CeWorkerRunnableImpl.class);
-
- private final CeQueue queue;
- private final CeLogging ceLogging;
- private final CeTaskProcessorRepository taskProcessorRepository;
-
- public CeWorkerRunnableImpl(CeQueue queue, CeLogging ceLogging, CeTaskProcessorRepository taskProcessorRepository) {
- this.queue = queue;
- this.ceLogging = ceLogging;
- this.taskProcessorRepository = taskProcessorRepository;
- }
-
- @Override
- public Boolean call() throws Exception {
- Optional<CeTask> ceTask = tryAndFindTaskToExecute();
- if (!ceTask.isPresent()) {
- return false;
- }
-
- executeTask(ceTask.get());
- return true;
- }
-
- private Optional<CeTask> tryAndFindTaskToExecute() {
- try {
- return queue.peek();
- } catch (Exception e) {
- LOG.error("Failed to pop the queue of analysis reports", e);
- }
- return Optional.absent();
- }
-
- private void executeTask(CeTask task) {
- // logging twice: once in sonar.log and once in CE appender
- Profiler regularProfiler = startProfiler(task);
- ceLogging.initForTask(task);
- Profiler ceProfiler = startProfiler(task);
-
- CeActivityDto.Status status = CeActivityDto.Status.FAILED;
- try {
- // TODO delegate the message to the related task processor, according to task type
- Optional<CeTaskProcessor> taskProcessor = taskProcessorRepository.getForCeTask(task);
- if (taskProcessor.isPresent()) {
- taskProcessor.get().process(task);
- status = CeActivityDto.Status.SUCCESS;
- } else {
- LOG.error("No CeTaskProcessor is defined for task of type {}. Plugin configuration may have changed", task.getType());
- status = CeActivityDto.Status.FAILED;
- }
- queue.remove(task, status);
- } catch (Throwable e) {
- LOG.error(format("Failed to execute task %s", task.getUuid()), e);
- queue.remove(task, status);
- } finally {
- // logging twice: once in sonar.log and once in CE appender
- stopProfiler(ceProfiler, task, status);
- ceLogging.clearForTask();
- stopProfiler(regularProfiler, task, status);
- }
- }
-
- private static Profiler startProfiler(CeTask task) {
- return Profiler.create(LOG).startInfo("Execute task | project={} | id={}", task.getComponentKey(), task.getUuid());
- }
-
- private static void stopProfiler(Profiler profiler, CeTask task, CeActivityDto.Status status) {
- if (status == CeActivityDto.Status.FAILED) {
- profiler.stopError("Executed task | project={} | id={}", task.getComponentKey(), task.getUuid());
- } else {
- profiler.stopInfo("Executed task | project={} | id={}", task.getComponentKey(), task.getUuid());
- }
- }
-}
// due to risks of infinite chaining of tasks/futures, a timeout is required for safety
public Timeout timeout = Timeout.seconds(60);
- private CeWorkerRunnable ceWorkerRunnable = mock(CeWorkerRunnable.class);
+ private CeWorkerCallable ceWorkerRunnable = mock(CeWorkerCallable.class);
private StubCeProcessingSchedulerExecutorService processingExecutorService = new StubCeProcessingSchedulerExecutorService();
private SchedulerCall regularDelayedPoll = new SchedulerCall(ceWorkerRunnable, 2L, TimeUnit.SECONDS);
private SchedulerCall notDelayedPoll = new SchedulerCall(ceWorkerRunnable);
private CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(processingExecutorService, ceWorkerRunnable);
@Test
- public void polls_without_delay_when_CeWorkerRunnable_returns_true() throws Exception {
+ public void polls_without_delay_when_CeWorkerCallable_returns_true() throws Exception {
when(ceWorkerRunnable.call())
.thenReturn(true)
.thenThrow(ERROR_TO_INTERRUPT_CHAINING);
}
@Test
- public void polls_without_delay_when_CeWorkerRunnable_throws_Exception_but_not_Error() throws Exception {
+ public void polls_without_delay_when_CeWorkerCallable_throws_Exception_but_not_Error() throws Exception {
when(ceWorkerRunnable.call())
.thenThrow(new Exception("Exception is followed by a poll without delay"))
.thenThrow(ERROR_TO_INTERRUPT_CHAINING);
}
@Test
- public void polls_with_regular_delay_when_CeWorkerRunnable_returns_false() throws Exception {
+ public void polls_with_regular_delay_when_CeWorkerCallable_returns_false() throws Exception {
when(ceWorkerRunnable.call())
.thenReturn(false)
.thenThrow(ERROR_TO_INTERRUPT_CHAINING);
}
@Test
- public void startScheduling_schedules_CeWorkerRunnable_at_fixed_rate_run_head_of_queue() throws Exception {
+ public void startScheduling_schedules_CeWorkerCallable_at_fixed_rate_run_head_of_queue() throws Exception {
when(ceWorkerRunnable.call())
.thenReturn(true)
.thenReturn(true)
--- /dev/null
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.server.computation.taskprocessor;
+
+import com.google.common.base.Optional;
+import org.junit.Rule;
+import org.junit.Test;
+import org.mockito.InOrder;
+import org.mockito.Mockito;
+import org.sonar.db.ce.CeActivityDto;
+import org.sonar.db.ce.CeTaskTypes;
+import org.sonar.server.computation.log.CeLogging;
+import org.sonar.server.computation.queue.CeQueue;
+import org.sonar.server.computation.queue.CeQueueImpl;
+import org.sonar.server.computation.queue.CeTask;
+import org.sonar.server.computation.taskprocessor.report.ReportTaskProcessor;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+
+public class CeWorkerCallableImplTest {
+
+ @Rule
+ public CeTaskProcessorRepositoryRule taskProcessorRepository = new CeTaskProcessorRepositoryRule();
+
+ CeQueue queue = mock(CeQueueImpl.class);
+ ReportTaskProcessor taskProcessor = mock(ReportTaskProcessor.class);
+ CeLogging ceLogging = mock(CeLogging.class);
+ CeWorkerCallable underTest = new CeWorkerCallableImpl(queue, ceLogging, taskProcessorRepository);
+ InOrder inOrder = Mockito.inOrder(ceLogging, taskProcessor, queue);
+
+ @Test
+ public void no_pending_tasks_in_queue() throws Exception {
+ when(queue.peek()).thenReturn(Optional.<CeTask>absent());
+
+ assertThat(underTest.call()).isFalse();
+
+ verifyZeroInteractions(taskProcessor, ceLogging);
+ }
+
+ @Test
+ public void fail_when_no_CeTaskProcessor_is_found_in_repository() throws Exception {
+ CeTask task = new CeTask.Builder().setUuid("TASK_1").setType(CeTaskTypes.REPORT).setComponentUuid("PROJECT_1").setSubmitterLogin(null).build();
+ taskProcessorRepository.setNoProcessorForTask(CeTaskTypes.REPORT);
+ when(queue.peek()).thenReturn(Optional.of(task));
+
+ assertThat(underTest.call()).isTrue();
+
+ inOrder.verify(ceLogging).initForTask(task);
+ inOrder.verify(queue).remove(task, CeActivityDto.Status.FAILED);
+ inOrder.verify(ceLogging).clearForTask();
+ }
+
+ @Test
+ public void peek_and_process_task() throws Exception {
+ CeTask task = new CeTask.Builder().setUuid("TASK_1").setType(CeTaskTypes.REPORT).setComponentUuid("PROJECT_1").setSubmitterLogin(null).build();
+ taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor);
+ when(queue.peek()).thenReturn(Optional.of(task));
+
+ assertThat(underTest.call()).isTrue();
+
+ inOrder.verify(ceLogging).initForTask(task);
+ inOrder.verify(taskProcessor).process(task);
+ inOrder.verify(queue).remove(task, CeActivityDto.Status.SUCCESS);
+ inOrder.verify(ceLogging).clearForTask();
+ }
+
+ @Test
+ public void fail_to_process_task() throws Exception {
+ CeTask task = new CeTask.Builder().setUuid("TASK_1").setType(CeTaskTypes.REPORT).setComponentUuid("PROJECT_1").setSubmitterLogin(null).build();
+ when(queue.peek()).thenReturn(Optional.of(task));
+ taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor);
+ doThrow(new IllegalStateException("simulate exception thrown by TaskProcessor#process")).when(taskProcessor).process(task);
+
+ assertThat(underTest.call()).isTrue();
+
+ inOrder.verify(ceLogging).initForTask(task);
+ inOrder.verify(taskProcessor).process(task);
+ inOrder.verify(queue).remove(task, CeActivityDto.Status.FAILED);
+ inOrder.verify(ceLogging).clearForTask();
+ }
+}
+++ /dev/null
-/*
- * SonarQube, open source software quality management tool.
- * Copyright (C) 2008-2014 SonarSource
- * mailto:contact AT sonarsource DOT com
- *
- * SonarQube is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * SonarQube is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-package org.sonar.server.computation.taskprocessor;
-
-import com.google.common.base.Optional;
-import org.junit.Rule;
-import org.junit.Test;
-import org.mockito.InOrder;
-import org.mockito.Mockito;
-import org.sonar.db.ce.CeActivityDto;
-import org.sonar.db.ce.CeTaskTypes;
-import org.sonar.server.computation.log.CeLogging;
-import org.sonar.server.computation.queue.CeQueue;
-import org.sonar.server.computation.queue.CeQueueImpl;
-import org.sonar.server.computation.queue.CeTask;
-import org.sonar.server.computation.taskprocessor.report.ReportTaskProcessor;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verifyZeroInteractions;
-import static org.mockito.Mockito.when;
-
-public class CeWorkerRunnableImplTest {
-
- @Rule
- public CeTaskProcessorRepositoryRule taskProcessorRepository = new CeTaskProcessorRepositoryRule();
-
- CeQueue queue = mock(CeQueueImpl.class);
- ReportTaskProcessor taskProcessor = mock(ReportTaskProcessor.class);
- CeLogging ceLogging = mock(CeLogging.class);
- CeWorkerRunnable underTest = new CeWorkerRunnableImpl(queue, ceLogging, taskProcessorRepository);
- InOrder inOrder = Mockito.inOrder(ceLogging, taskProcessor, queue);
-
- @Test
- public void no_pending_tasks_in_queue() throws Exception {
- when(queue.peek()).thenReturn(Optional.<CeTask>absent());
-
- assertThat(underTest.call()).isFalse();
-
- verifyZeroInteractions(taskProcessor, ceLogging);
- }
-
- @Test
- public void fail_when_no_CeTaskProcessor_is_found_in_repository() throws Exception {
- CeTask task = new CeTask.Builder().setUuid("TASK_1").setType(CeTaskTypes.REPORT).setComponentUuid("PROJECT_1").setSubmitterLogin(null).build();
- taskProcessorRepository.setNoProcessorForTask(CeTaskTypes.REPORT);
- when(queue.peek()).thenReturn(Optional.of(task));
-
- assertThat(underTest.call()).isTrue();
-
- inOrder.verify(ceLogging).initForTask(task);
- inOrder.verify(queue).remove(task, CeActivityDto.Status.FAILED);
- inOrder.verify(ceLogging).clearForTask();
- }
-
- @Test
- public void peek_and_process_task() throws Exception {
- CeTask task = new CeTask.Builder().setUuid("TASK_1").setType(CeTaskTypes.REPORT).setComponentUuid("PROJECT_1").setSubmitterLogin(null).build();
- taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor);
- when(queue.peek()).thenReturn(Optional.of(task));
-
- assertThat(underTest.call()).isTrue();
-
- inOrder.verify(ceLogging).initForTask(task);
- inOrder.verify(taskProcessor).process(task);
- inOrder.verify(queue).remove(task, CeActivityDto.Status.SUCCESS);
- inOrder.verify(ceLogging).clearForTask();
- }
-
- @Test
- public void fail_to_process_task() throws Exception {
- CeTask task = new CeTask.Builder().setUuid("TASK_1").setType(CeTaskTypes.REPORT).setComponentUuid("PROJECT_1").setSubmitterLogin(null).build();
- when(queue.peek()).thenReturn(Optional.of(task));
- taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor);
- doThrow(new IllegalStateException("simulate exception thrown by TaskProcessor#process")).when(taskProcessor).process(task);
-
- assertThat(underTest.call()).isTrue();
-
- inOrder.verify(ceLogging).initForTask(task);
- inOrder.verify(taskProcessor).process(task);
- inOrder.verify(queue).remove(task, CeActivityDto.Status.FAILED);
- inOrder.verify(ceLogging).clearForTask();
- }
-}