2 * SonarQube, open source software quality management tool.
3 * Copyright (C) 2008-2014 SonarSource
4 * mailto:contact AT sonarsource DOT com
6 * SonarQube is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation; either
9 * version 3 of the License, or (at your option) any later version.
11 * SonarQube is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
16 * You should have received a copy of the GNU Lesser General Public License
17 * along with this program; if not, write to the Free Software Foundation,
18 * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
20 package org.sonar.server.computation.taskprocessor;
22 import com.google.common.util.concurrent.FutureCallback;
23 import com.google.common.util.concurrent.Futures;
24 import com.google.common.util.concurrent.ListenableFuture;
25 import com.google.common.util.concurrent.ListenableScheduledFuture;
26 import java.util.concurrent.TimeUnit;
27 import java.util.concurrent.atomic.AtomicBoolean;
28 import javax.annotation.CheckForNull;
29 import javax.annotation.Nullable;
30 import org.picocontainer.Startable;
31 import org.sonar.api.utils.log.Logger;
32 import org.sonar.api.utils.log.Loggers;
34 import static com.google.common.util.concurrent.Futures.addCallback;
35 import static java.util.concurrent.TimeUnit.SECONDS;
37 public class CeProcessingSchedulerImpl implements CeProcessingScheduler, Startable {
38 private static final Logger LOG = Loggers.get(CeProcessingSchedulerImpl.class);
40 private final CeProcessingSchedulerExecutorService executorService;
41 private final CeWorkerRunnable workerRunnable;
43 private final long delayBetweenTasks;
44 private final TimeUnit timeUnit;
45 // warning: using a single ChainingCallback object for chaining works and is thread safe only because we use a single Thread in CeProcessingSchedulerExecutorService
46 private final ChainingCallback chainingCallback = new ChainingCallback();
48 public CeProcessingSchedulerImpl(CeProcessingSchedulerExecutorService processingExecutorService, CeWorkerRunnable workerRunnable) {
49 this.executorService = processingExecutorService;
50 this.workerRunnable = workerRunnable;
52 this.delayBetweenTasks = 2;
53 this.timeUnit = SECONDS;
58 // nothing to do at component startup, startScheduling will be called by CeQueueInitializer
62 public void startScheduling() {
63 ListenableScheduledFuture<Boolean> future = executorService.schedule(workerRunnable, delayBetweenTasks, timeUnit);
65 addCallback(future, chainingCallback, executorService);
70 this.chainingCallback.stop();
73 private class ChainingCallback implements FutureCallback<Boolean> {
74 private final AtomicBoolean keepRunning = new AtomicBoolean(true);
76 private ListenableFuture<Boolean> workerFuture;
79 public void onSuccess(@Nullable Boolean result) {
80 if (result != null && result) {
88 public void onFailure(Throwable t) {
89 if (t instanceof Error) {
90 LOG.error("Compute Engine execution failed. Scheduled processing interrupted.", t);
96 private void chainWithoutDelay() {
98 workerFuture = executorService.submit(workerRunnable);
103 private void chainWithDelay() {
105 workerFuture = executorService.schedule(workerRunnable, delayBetweenTasks, timeUnit);
110 private void addCallback() {
111 if (workerFuture != null && keepRunning()) {
112 Futures.addCallback(workerFuture, this, executorService);
116 private boolean keepRunning() {
117 return keepRunning.get();
121 this.keepRunning.set(false);
122 if (workerFuture != null) {
123 workerFuture.cancel(false);