]> source.dussan.org Git - sonarqube.git/blob
a5040317030ec7386aeec037f611f3edc81ed22b
[sonarqube.git] /
1 /*
2  * SonarQube, open source software quality management tool.
3  * Copyright (C) 2008-2014 SonarSource
4  * mailto:contact AT sonarsource DOT com
5  *
6  * SonarQube is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 3 of the License, or (at your option) any later version.
10  *
11  * SonarQube is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public License
17  * along with this program; if not, write to the Free Software Foundation,
18  * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
19  */
20 package org.sonar.server.computation.taskprocessor;
21
22 import com.google.common.util.concurrent.FutureCallback;
23 import com.google.common.util.concurrent.Futures;
24 import com.google.common.util.concurrent.ListenableFuture;
25 import com.google.common.util.concurrent.ListenableScheduledFuture;
26 import java.util.concurrent.TimeUnit;
27 import java.util.concurrent.atomic.AtomicBoolean;
28 import javax.annotation.CheckForNull;
29 import javax.annotation.Nullable;
30 import org.picocontainer.Startable;
31 import org.sonar.api.utils.log.Logger;
32 import org.sonar.api.utils.log.Loggers;
33
34 import static com.google.common.util.concurrent.Futures.addCallback;
35 import static java.util.concurrent.TimeUnit.SECONDS;
36
37 public class CeProcessingSchedulerImpl implements CeProcessingScheduler, Startable {
38   private static final Logger LOG = Loggers.get(CeProcessingSchedulerImpl.class);
39
40   private final CeProcessingSchedulerExecutorService executorService;
41   private final CeWorkerRunnable workerRunnable;
42
43   private final long delayBetweenTasks;
44   private final TimeUnit timeUnit;
45   // warning: using a single ChainingCallback object for chaining works and is thread safe only because we use a single Thread in CeProcessingSchedulerExecutorService
46   private final ChainingCallback chainingCallback = new ChainingCallback();
47
48   public CeProcessingSchedulerImpl(CeProcessingSchedulerExecutorService processingExecutorService, CeWorkerRunnable workerRunnable) {
49     this.executorService = processingExecutorService;
50     this.workerRunnable = workerRunnable;
51
52     this.delayBetweenTasks = 2;
53     this.timeUnit = SECONDS;
54   }
55
56   @Override
57   public void start() {
58     // nothing to do at component startup, startScheduling will be called by CeQueueInitializer
59   }
60
61   @Override
62   public void startScheduling() {
63     ListenableScheduledFuture<Boolean> future = executorService.schedule(workerRunnable, delayBetweenTasks, timeUnit);
64
65     addCallback(future, chainingCallback, executorService);
66   }
67
68   @Override
69   public void stop() {
70     this.chainingCallback.stop();
71   }
72
73   private class ChainingCallback implements FutureCallback<Boolean> {
74     private final AtomicBoolean keepRunning = new AtomicBoolean(true);
75     @CheckForNull
76     private ListenableFuture<Boolean> workerFuture;
77
78     @Override
79     public void onSuccess(@Nullable Boolean result) {
80       if (result != null && result) {
81         chainWithoutDelay();
82       } else {
83         chainWithDelay();
84       }
85     }
86
87     @Override
88     public void onFailure(Throwable t) {
89       if (t instanceof Error) {
90         LOG.error("Compute Engine execution failed. Scheduled processing interrupted.", t);
91       } else {
92         chainWithoutDelay();
93       }
94     }
95
96     private void chainWithoutDelay() {
97       if (keepRunning()) {
98         workerFuture = executorService.submit(workerRunnable);
99       }
100       addCallback();
101     }
102
103     private void chainWithDelay() {
104       if (keepRunning()) {
105         workerFuture = executorService.schedule(workerRunnable, delayBetweenTasks, timeUnit);
106       }
107       addCallback();
108     }
109
110     private void addCallback() {
111       if (workerFuture != null && keepRunning()) {
112         Futures.addCallback(workerFuture, this, executorService);
113       }
114     }
115
116     private boolean keepRunning() {
117       return keepRunning.get();
118     }
119
120     public void stop() {
121       this.keepRunning.set(false);
122       if (workerFuture != null) {
123         workerFuture.cancel(false);
124       }
125     }
126   }
127 }