*/
long getCleanCeTasksDelay();
+ /**
+ * Delay before stopping workers during a graceful timeout using milliseconds unit.
+ */
+ int getGracefulStopTimeoutInMs();
+
}
import javax.annotation.CheckForNull;
import org.picocontainer.Startable;
+import org.sonar.api.config.Configuration;
import org.sonar.api.utils.MessageException;
import static java.lang.String.format;
private static final long CANCEL_WORN_OUTS_INITIAL_DELAY = 1;
// 10 minutes
private static final long CANCEL_WORN_OUTS_DELAY = 10;
+ // 40 seconds
+ private static final int GRACEFUL_STOP_TIMEOUT = 40;
+ public static final String SONAR_CE_GRACEFUL_STOP_TIME_OUT_IN_MS = "sonar.ce.gracefulStopTimeOutInMs";
@CheckForNull
private final WorkerCountProvider workerCountProvider;
private final int workerThreadCount;
+ private final int gracefultStopTimeoutInMs;
private int workerCount;
- public CeConfigurationImpl() {
+ public CeConfigurationImpl(Configuration configuration) {
this.workerCountProvider = null;
this.workerThreadCount = DEFAULT_WORKER_THREAD_COUNT;
this.workerCount = DEFAULT_WORKER_COUNT;
+ this.gracefultStopTimeoutInMs = configuration.getInt(SONAR_CE_GRACEFUL_STOP_TIME_OUT_IN_MS).orElse(GRACEFUL_STOP_TIMEOUT);
}
- public CeConfigurationImpl(WorkerCountProvider workerCountProvider) {
+ public CeConfigurationImpl(Configuration configuration, WorkerCountProvider workerCountProvider) {
this.workerCountProvider = workerCountProvider;
this.workerThreadCount = MAX_WORKER_THREAD_COUNT;
this.workerCount = readWorkerCount(workerCountProvider);
+ this.gracefultStopTimeoutInMs = configuration.getInt(SONAR_CE_GRACEFUL_STOP_TIME_OUT_IN_MS).orElse(GRACEFUL_STOP_TIMEOUT);
}
private static int readWorkerCount(WorkerCountProvider workerCountProvider) {
return CANCEL_WORN_OUTS_DELAY;
}
+ @Override
+ public int getGracefulStopTimeoutInMs() {
+ return gracefultStopTimeoutInMs;
+ }
+
}
private final TimeUnit timeUnit;
private final ChainingCallback[] chainingCallbacks;
private final EnabledCeWorkerController ceWorkerController;
+ private final int gracefulStopTimeoutInMs;
public CeProcessingSchedulerImpl(CeConfiguration ceConfiguration,
CeProcessingSchedulerExecutorService processingExecutorService, CeWorkerFactory ceCeWorkerFactory,
this.executorService = processingExecutorService;
this.delayBetweenEnabledTasks = ceConfiguration.getQueuePollingDelay();
+ this.gracefulStopTimeoutInMs = ceConfiguration.getGracefulStopTimeoutInMs();
this.ceWorkerController = ceWorkerController;
this.timeUnit = MILLISECONDS;
}
// Workers have 40s to gracefully stop processing tasks
- long until = System.currentTimeMillis() + 40_000L;
+ long until = System.currentTimeMillis() + gracefulStopTimeoutInMs;
LOG.info("Waiting for workers to finish in-progress tasks");
while (System.currentTimeMillis() < until && ceWorkerController.hasAtLeastOneProcessingWorker()) {
try {
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
+import org.sonar.api.config.internal.ConfigurationBridge;
+import org.sonar.api.config.internal.MapSettings;
import org.sonar.api.utils.MessageException;
import static java.lang.Math.abs;
import static org.assertj.core.api.Assertions.assertThat;
public class CeConfigurationImplTest {
+ public static final ConfigurationBridge EMPTY_CONFIGURATION = new ConfigurationBridge(new MapSettings());
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Test
public void getWorkerCount_returns_1_when_there_is_no_WorkerCountProvider() {
- assertThat(new CeConfigurationImpl().getWorkerCount()).isEqualTo(1);
+ assertThat(new CeConfigurationImpl(EMPTY_CONFIGURATION).getWorkerCount()).isEqualTo(1);
}
@Test
public void getWorkerMaxCount_returns_1_when_there_is_no_WorkerCountProvider() {
- assertThat(new CeConfigurationImpl().getWorkerMaxCount()).isEqualTo(1);
+ assertThat(new CeConfigurationImpl(EMPTY_CONFIGURATION).getWorkerMaxCount()).isEqualTo(1);
}
@Test
int value = randomValidWorkerCount();
workerCountProvider.set(value);
- assertThat(new CeConfigurationImpl(workerCountProvider).getWorkerCount()).isEqualTo(value);
+ assertThat(new CeConfigurationImpl(EMPTY_CONFIGURATION, workerCountProvider).getWorkerCount()).isEqualTo(value);
}
@Test
int value = randomValidWorkerCount();
workerCountProvider.set(value);
- assertThat(new CeConfigurationImpl(workerCountProvider).getWorkerMaxCount()).isEqualTo(10);
+ assertThat(new CeConfigurationImpl(EMPTY_CONFIGURATION, workerCountProvider).getWorkerMaxCount()).isEqualTo(10);
}
@Test
expectMessageException(0);
- new CeConfigurationImpl(workerCountProvider);
+ new CeConfigurationImpl(EMPTY_CONFIGURATION, workerCountProvider);
}
@Test
expectMessageException(value);
- new CeConfigurationImpl(workerCountProvider);
+ new CeConfigurationImpl(EMPTY_CONFIGURATION, workerCountProvider);
}
@Test
expectMessageException(value);
- new CeConfigurationImpl(workerCountProvider);
+ new CeConfigurationImpl(EMPTY_CONFIGURATION, workerCountProvider);
}
private void expectMessageException(int value) {
@Test
public void getCleanCeTasksInitialDelay_returns_1() {
- assertThat(new CeConfigurationImpl().getCleanCeTasksInitialDelay())
+ assertThat(new CeConfigurationImpl(EMPTY_CONFIGURATION).getCleanCeTasksInitialDelay())
.isEqualTo(1L);
workerCountProvider.set(1);
- assertThat(new CeConfigurationImpl(workerCountProvider).getCleanCeTasksInitialDelay())
+ assertThat(new CeConfigurationImpl(EMPTY_CONFIGURATION, workerCountProvider).getCleanCeTasksInitialDelay())
.isEqualTo(1L);
}
@Test
public void getCleanCeTasksDelay_returns_10() {
- assertThat(new CeConfigurationImpl().getCleanCeTasksDelay())
+ assertThat(new CeConfigurationImpl(EMPTY_CONFIGURATION).getCleanCeTasksDelay())
.isEqualTo(10L);
workerCountProvider.set(1);
- assertThat(new CeConfigurationImpl(workerCountProvider).getCleanCeTasksDelay())
+ assertThat(new CeConfigurationImpl(EMPTY_CONFIGURATION, workerCountProvider).getCleanCeTasksDelay())
.isEqualTo(10L);
}
@Test
public void refresh_does_not_change_any_value_when_there_is_no_WorkerCountProvider() {
- CeConfigurationImpl underTest = new CeConfigurationImpl();
+ CeConfigurationImpl underTest = new CeConfigurationImpl(EMPTY_CONFIGURATION);
long cleanCeTasksInitialDelay = underTest.getCleanCeTasksInitialDelay();
long cleanCeTasksDelay = underTest.getCleanCeTasksDelay();
long queuePollingDelay = underTest.getQueuePollingDelay();
@Test
public void refresh_updates_only_workerCount_from_WorkerCountProvider_when_there_WorkerCountProvider_is_present() {
workerCountProvider.set(randomValidWorkerCount());
- CeConfigurationImpl underTest = new CeConfigurationImpl(workerCountProvider);
+ CeConfigurationImpl underTest = new CeConfigurationImpl(EMPTY_CONFIGURATION, workerCountProvider);
long cleanCeTasksInitialDelay = underTest.getCleanCeTasksInitialDelay();
long cleanCeTasksDelay = underTest.getCleanCeTasksDelay();
long queuePollingDelay = underTest.getQueuePollingDelay();
return cancelWornOutsDelay;
}
+ @Override
+ public int getGracefulStopTimeoutInMs() {
+ return 40_000;
+ }
+
public void setCleanCeTasksDelay(long cancelWornOutsDelay) {
checkArgument(cancelWornOutsDelay > 0, "cancel worn-outs polling delay must be >= 1");
this.cancelWornOutsDelay = cancelWornOutsDelay;
throw new UnsupportedOperationException("getCleanCeTasksDelay is not implemented");
}
+ @Override
+ public int getGracefulStopTimeoutInMs() {
+ return 40_000;
+ }
+
}
@CheckForNull
import java.io.IOException;
import java.util.regex.Pattern;
import org.apache.commons.io.FileUtils;
-import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.DisableOnDebug;
@Test
public void stopping_CE_waits_for_in_progress_task_to_be_finished() throws Exception {
- try (ComputeEngine ce = new ComputeEngine()) {
+ try (ComputeEngine ce = new ComputeEngine(40_000)) {
try (LogsTailer.Watch watch = ce.logs().watch("CE analysis is paused")) {
ce.triggerTask();
}
@Test
- @Ignore("TODO make the graceful stop timeout configurable. 40 seconds is too long for a test.")
public void stopping_CE_kills_in_progress_tasks_if_too_long_to_gracefully_stop() throws Exception {
- try (ComputeEngine ce = new ComputeEngine()) {
+ try (ComputeEngine ce = new ComputeEngine(10)) {
try (LogsTailer.Watch watch = ce.logs().watch("CE analysis is paused")) {
ce.triggerTask();
assertThat(ce.countInProgressTasks()).isEqualTo(1);
}
- // resume the in-progress task, so that it can
- // finish successfully
+ // wait for task to be hard killed
try (LogsTailer.Watch watch = ce.logs().watch("Process [ce] is stopped")) {
watch.waitForLog();
- assertThat(ce.hasTaskFinishedSuccessfully()).isTrue();
+ assertThat(ce.hasTaskFinishedSuccessfully()).isFalse();
assertThat(ce.hasErrorLogs()).isTrue();
}
}
private final LogsTailer logsTailer;
private final LogsTailer.Content content = new LogsTailer.Content();
- ComputeEngine() throws Exception {
+ ComputeEngine(int timeOutInMs) throws Exception {
pauseFile = temp.newFile();
FileUtils.touch(pauseFile);
orchestrator = Orchestrator.builderEnv()
.setServerProperty("sonar.ce.pauseTask.path", pauseFile.getAbsolutePath())
+ .setServerProperty("sonar.ce.gracefulStopTimeOutInMs", "" + timeOutInMs)
.addPlugin(ItUtils.xooPlugin())
.addPlugin(ItUtils.pluginArtifact("server-plugin"))
.build();