import org.sonar.ce.httpd.CeHttpServer;
import org.sonar.ce.logging.ChangeLogLevelHttpAction;
import org.sonar.ce.systeminfo.SystemInfoHttpAction;
-import org.sonar.ce.taskprocessor.RefreshWorkerCountAction;
import org.sonar.core.platform.Module;
public class CeHttpModule extends Module {
add(
CeHttpServer.class,
SystemInfoHttpAction.class,
- ChangeLogLevelHttpAction.class,
- RefreshWorkerCountAction.class);
+ ChangeLogLevelHttpAction.class);
}
}
public interface CeConfiguration {
- /**
- * Requests {@link CeConfiguration} to refresh its state, if it has any.
- */
- void refresh();
-
/**
* The maximum number of workers to process CeTasks concurrently, integer strictly greater than 0.
*/
package org.sonar.ce.configuration;
import javax.annotation.CheckForNull;
+import javax.annotation.Nullable;
import org.picocontainer.Startable;
import org.sonar.api.config.Configuration;
import org.sonar.api.utils.MessageException;
private int workerCount;
public CeConfigurationImpl(Configuration configuration) {
- this.workerCountProvider = null;
- this.workerThreadCount = DEFAULT_WORKER_THREAD_COUNT;
- this.workerCount = DEFAULT_WORKER_COUNT;
- this.gracefultStopTimeoutInMs = configuration.getInt(SONAR_CE_GRACEFUL_STOP_TIME_OUT_IN_MS).orElse(GRACEFUL_STOP_TIMEOUT);
+ this(configuration, null);
}
- public CeConfigurationImpl(Configuration configuration, WorkerCountProvider workerCountProvider) {
+ public CeConfigurationImpl(Configuration configuration, @Nullable WorkerCountProvider workerCountProvider) {
this.workerCountProvider = workerCountProvider;
- this.workerThreadCount = MAX_WORKER_THREAD_COUNT;
- this.workerCount = readWorkerCount(workerCountProvider);
this.gracefultStopTimeoutInMs = configuration.getInt(SONAR_CE_GRACEFUL_STOP_TIME_OUT_IN_MS).orElse(GRACEFUL_STOP_TIMEOUT);
+ if (workerCountProvider == null) {
+ this.workerCount = DEFAULT_WORKER_COUNT;
+ this.workerThreadCount = DEFAULT_WORKER_THREAD_COUNT;
+ } else {
+ this.workerCount = readWorkerCount(workerCountProvider);
+ this.workerThreadCount = MAX_WORKER_THREAD_COUNT;
+ }
}
- private static int readWorkerCount(WorkerCountProvider workerCountProvider) {
+ private static synchronized int readWorkerCount(WorkerCountProvider workerCountProvider) {
int value = workerCountProvider.get();
if (value < DEFAULT_WORKER_COUNT || value > MAX_WORKER_THREAD_COUNT) {
throw parsingError(value);
private static MessageException parsingError(int value) {
return MessageException.of(format(
- "Worker count '%s' is invalid. It must an integer strictly greater than 0 and less or equal to 10",
- value));
+ "Worker count '%s' is invalid. It must be an integer strictly greater than 0 and less or equal to 10",
+ value));
}
@Override
public void start() {
- //
+ // nothing to do
}
@Override
// nothing to do
}
- @Override
- public void refresh() {
- if (workerCountProvider != null) {
- this.workerCount = readWorkerCount(workerCountProvider);
- }
- }
-
@Override
public int getWorkerMaxCount() {
return workerThreadCount;
@Override
public int getWorkerCount() {
+ if (workerCountProvider != null) {
+ workerCount = readWorkerCount(workerCountProvider);
+ }
return workerCount;
}
interface ProcessingRecorderHook extends AutoCloseable {
}
- /**
- * Requests the {@link EnabledCeWorkerController} to refresh its state, if it has any.
- */
- void refresh();
-
/**
* Returns {@code true} if the specified {@link CeWorker} is enabled
*/
package org.sonar.ce.taskprocessor;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
import org.sonar.api.utils.log.Loggers;
import org.sonar.ce.configuration.CeConfiguration;
public class EnabledCeWorkerControllerImpl implements EnabledCeWorkerController {
private final ConcurrentHashMap<CeWorker, Status> map = new ConcurrentHashMap<>();
private final CeConfiguration ceConfiguration;
- private final AtomicInteger workerCount;
enum Status {
PROCESSING, PAUSED
public EnabledCeWorkerControllerImpl(CeConfiguration ceConfiguration) {
this.ceConfiguration = ceConfiguration;
- this.workerCount = new AtomicInteger(ceConfiguration.getWorkerCount());
logEnabledWorkerCount();
}
private void logEnabledWorkerCount() {
- if (workerCount.get() > 1) {
- Loggers.get(EnabledCeWorkerController.class).info("Compute Engine will use {} concurrent workers to process tasks", this.workerCount);
+ int workerCount = ceConfiguration.getWorkerCount();
+ if (workerCount > 1) {
+ Loggers.get(EnabledCeWorkerController.class).info("Compute Engine will use {} concurrent workers to process tasks", workerCount);
}
}
- @Override
- public void refresh() {
- ceConfiguration.refresh();
- this.workerCount.set(ceConfiguration.getWorkerCount());
- logEnabledWorkerCount();
- }
-
@Override
public ProcessingRecorderHook registerProcessingFor(CeWorker ceWorker) {
return new ProcessingRecorderHookImpl(ceWorker);
*/
@Override
public boolean isEnabled(CeWorker ceWorker) {
- return ceWorker.getOrdinal() < workerCount.get();
+ return ceWorker.getOrdinal() < ceConfiguration.getWorkerCount();
}
private class ProcessingRecorderHookImpl implements ProcessingRecorderHook {
+++ /dev/null
-/*
- * SonarQube
- * Copyright (C) 2009-2018 SonarSource SA
- * mailto:info AT sonarsource DOT com
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-package org.sonar.ce.taskprocessor;
-
-import fi.iki.elonen.NanoHTTPD;
-import org.sonar.ce.httpd.HttpAction;
-
-import static fi.iki.elonen.NanoHTTPD.MIME_PLAINTEXT;
-import static fi.iki.elonen.NanoHTTPD.newFixedLengthResponse;
-import static fi.iki.elonen.NanoHTTPD.Response.Status.METHOD_NOT_ALLOWED;
-import static fi.iki.elonen.NanoHTTPD.Response.Status.OK;
-
-public class RefreshWorkerCountAction implements HttpAction {
- private static final String PATH = "refreshWorkerCount";
-
- private final EnabledCeWorkerController enabledCeWorkerController;
-
- public RefreshWorkerCountAction(EnabledCeWorkerController enabledCeWorkerController) {
- this.enabledCeWorkerController = enabledCeWorkerController;
- }
-
- @Override
- public void register(ActionRegistry registry) {
- registry.register(PATH, this);
- }
-
- @Override
- public NanoHTTPD.Response serve(NanoHTTPD.IHTTPSession session) {
- if (session.getMethod() != NanoHTTPD.Method.POST) {
- return newFixedLengthResponse(METHOD_NOT_ALLOWED, MIME_PLAINTEXT, null);
- }
-
- enabledCeWorkerController.refresh();
-
- return newFixedLengthResponse(OK, MIME_PLAINTEXT, null);
- }
-}
package org.sonar.ce.configuration;
import java.util.Random;
-import java.util.stream.IntStream;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
private void expectMessageException(int value) {
expectedException.expect(MessageException.class);
expectedException.expectMessage("Worker count '" + value + "' is invalid. " +
- "It must an integer strictly greater than 0 and less or equal to 10");
+ "It must be an integer strictly greater than 0 and less or equal to 10");
}
@Test
.isEqualTo(2L);
}
- @Test
- public void refresh_does_not_change_any_value_when_there_is_no_WorkerCountProvider() {
- CeConfigurationImpl underTest = new CeConfigurationImpl(EMPTY_CONFIGURATION);
- long cleanCeTasksInitialDelay = underTest.getCleanCeTasksInitialDelay();
- long cleanCeTasksDelay = underTest.getCleanCeTasksDelay();
- long queuePollingDelay = underTest.getQueuePollingDelay();
- int workerThreadCount = underTest.getWorkerMaxCount();
- int workerCount = underTest.getWorkerCount();
-
- IntStream.range(0, 1 + abs(new Random().nextInt(10)))
- .forEach(ignored -> {
- underTest.refresh();
- assertThat(underTest.getCleanCeTasksInitialDelay()).isEqualTo(cleanCeTasksInitialDelay);
- assertThat(underTest.getCleanCeTasksDelay()).isEqualTo(cleanCeTasksDelay);
- assertThat(underTest.getQueuePollingDelay()).isEqualTo(queuePollingDelay);
- assertThat(underTest.getWorkerMaxCount()).isEqualTo(workerThreadCount);
- assertThat(underTest.getWorkerCount()).isEqualTo(workerCount);
- });
- }
-
- @Test
- public void refresh_updates_only_workerCount_from_WorkerCountProvider_when_there_WorkerCountProvider_is_present() {
- workerCountProvider.set(randomValidWorkerCount());
- CeConfigurationImpl underTest = new CeConfigurationImpl(EMPTY_CONFIGURATION, workerCountProvider);
- long cleanCeTasksInitialDelay = underTest.getCleanCeTasksInitialDelay();
- long cleanCeTasksDelay = underTest.getCleanCeTasksDelay();
- long queuePollingDelay = underTest.getQueuePollingDelay();
- int workerThreadCount = underTest.getWorkerMaxCount();
-
- IntStream.range(0, 1 + abs(new Random().nextInt(10)))
- .forEach(ignored -> {
- int newWorkerCount = randomValidWorkerCount();
- workerCountProvider.set(newWorkerCount);
-
- underTest.refresh();
-
- assertThat(underTest.getCleanCeTasksInitialDelay()).isEqualTo(cleanCeTasksInitialDelay);
- assertThat(underTest.getCleanCeTasksDelay()).isEqualTo(cleanCeTasksDelay);
- assertThat(underTest.getQueuePollingDelay()).isEqualTo(queuePollingDelay);
- assertThat(underTest.getWorkerMaxCount()).isEqualTo(workerThreadCount);
- assertThat(underTest.getWorkerCount()).isEqualTo(newWorkerCount);
- });
- }
-
private static final class SimpleWorkerCountProvider implements WorkerCountProvider {
private int value = 0;
*/
package org.sonar.ce.configuration;
-import java.util.function.Consumer;
import org.junit.rules.ExternalResource;
import static com.google.common.base.Preconditions.checkArgument;
private long queuePollingDelay = 2 * 1000L;
private long cancelWornOutsInitialDelay = 1L;
private long cancelWornOutsDelay = 10L;
- private Consumer<CeConfigurationRule> refreshCallHook;
-
- @Override
- public void refresh() {
- if (this.refreshCallHook != null) {
- this.refreshCallHook.accept(this);
- }
- }
-
- public void setRefreshCallHook(Consumer<CeConfigurationRule> refreshCallHook) {
- this.refreshCallHook = refreshCallHook;
- }
@Override
public int getWorkerMaxCount() {
+ 72 // level 4
+ 6 // content of CeConfigurationModule
+ 4 // content of CeQueueModule
- + 4 // content of CeHttpModule
+ + 3 // content of CeHttpModule
+ 3 // content of CeTaskCommonsModule
+ 4 // content of ProjectAnalysisTaskModule
+ 7 // content of CeTaskProcessorModule
}
private static class DumbCeConfiguration implements CeConfiguration {
- @Override
- public void refresh() {
- throw new UnsupportedOperationException("Refresh is not implemented");
- }
-
@Override
public int getWorkerMaxCount() {
return WORKER_MAX_COUNT;
}
@Test
- public void workerCount_is_loaded_in_constructor() {
- when(ceWorker.getOrdinal()).thenReturn(randomWorkerCount);
- assertThat(underTest.isEnabled(ceWorker)).isFalse();
-
- ceConfigurationRule.setWorkerCount(randomWorkerCount + 1);
- assertThat(underTest.isEnabled(ceWorker)).isFalse();
- }
+ public void workerCount_is_always_reloaded() {
+ when(ceWorker.getOrdinal()).thenReturn(1);
- @Test
- public void refresh_reloads_workerCount() {
- when(ceWorker.getOrdinal()).thenReturn(randomWorkerCount);
+ ceConfigurationRule.setWorkerCount(1);
assertThat(underTest.isEnabled(ceWorker)).isFalse();
- ceConfigurationRule.setRefreshCallHook((rule) -> rule.setWorkerCount(randomWorkerCount + 1));
-
- underTest.refresh();
+ ceConfigurationRule.setWorkerCount(2);
assertThat(underTest.isEnabled(ceWorker)).isTrue();
}
- @Test
- public void refresh_writes_info_log_if_workerCount_is_greater_than_1() {
- logTester.clear();
- int newWorkerCount = randomWorkerCount + 1;
- ceConfigurationRule.setRefreshCallHook((rule) -> rule.setWorkerCount(newWorkerCount));
-
- underTest.refresh();
-
- verifyInfoLog(newWorkerCount);
- }
-
- @Test
- public void refresh_writes_no_info_log_if_workerCount_is_1() {
- logTester.clear();
- ceConfigurationRule.setRefreshCallHook((rule) -> rule.setWorkerCount(1));
-
- underTest.refresh();
-
- assertThat(logTester.logs()).isEmpty();
- }
-
private void verifyInfoLog(int workerCount) {
assertThat(logTester.logs()).hasSize(1);
assertThat(logTester.logs(LoggerLevel.INFO))
+++ /dev/null
-/*
- * SonarQube
- * Copyright (C) 2009-2018 SonarSource SA
- * mailto:info AT sonarsource DOT com
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-package org.sonar.ce.taskprocessor;
-
-import fi.iki.elonen.NanoHTTPD;
-import org.junit.Test;
-import org.sonar.ce.httpd.HttpAction;
-
-import static fi.iki.elonen.NanoHTTPD.Method.GET;
-import static fi.iki.elonen.NanoHTTPD.Method.POST;
-import static fi.iki.elonen.NanoHTTPD.Response.Status.METHOD_NOT_ALLOWED;
-import static fi.iki.elonen.NanoHTTPD.Response.Status.OK;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.verifyZeroInteractions;
-import static org.sonar.ce.httpd.CeHttpUtils.createHttpSession;
-
-public class RefreshWorkerCountActionTest {
- private EnabledCeWorkerController enabledCeWorkerController = mock(EnabledCeWorkerController.class);
- private RefreshWorkerCountAction underTest = new RefreshWorkerCountAction(enabledCeWorkerController);
-
- @Test
- public void register_to_path_changeLogLevel() {
- HttpAction.ActionRegistry actionRegistry = mock(HttpAction.ActionRegistry.class);
-
- underTest.register(actionRegistry);
-
- verify(actionRegistry).register("refreshWorkerCount", underTest);
- }
-
- @Test
- public void serves_METHOD_NOT_ALLOWED_error_when_method_is_not_POST() {
- NanoHTTPD.Response response = underTest.serve(createHttpSession(GET));
-
- assertThat(response.getStatus()).isEqualTo(METHOD_NOT_ALLOWED);
- verifyZeroInteractions(enabledCeWorkerController);
- }
-
- @Test
- public void call_EnabledCeWorkerController_refresh_on_POST() {
- NanoHTTPD.Response response = underTest.serve(createHttpSession(POST));
-
- assertThat(response.getStatus()).isEqualTo(OK);
- verify(enabledCeWorkerController).refresh();
- verifyNoMoreInteractions(enabledCeWorkerController);
- }
-}
Optional<ProtobufSystemInfo.SystemInfo> retrieveSystemInfo();
void changeLogLevel(LoggerLevel level);
-
- void refreshCeWorkerCount();
}
}
}
- @Override
- public void refreshCeWorkerCount() {
- call(RefreshCeWorkerCountActionClient.INSTANCE);
- }
-
- private enum RefreshCeWorkerCountActionClient implements ActionClient<Void> {
- INSTANCE;
-
- @Override
- public String getPath() {
- return "refreshWorkerCount";
- }
-
- @Override
- public Void getDefault() {
- return null;
- }
-
- @Override
- public Void call(String url) throws Exception {
- okhttp3.Request request = new okhttp3.Request.Builder()
- .post(RequestBody.create(null, new byte[0]))
- .url(url)
- .build();
- try (okhttp3.Response response = new OkHttpClient().newCall(request).execute()) {
- if (response.code() != 200) {
- throw new IOException(
- String.format(
- "Failed to trigger refresh of CE Worker count. Code was '%s' and response was '%s' for url '%s'",
- response.code(),
- response.body().string(),
- url));
- }
- return null;
- }
- }
- }
-
private <T> T call(ActionClient<T> actionClient) {
try (DefaultProcessCommands commands = DefaultProcessCommands.secondary(ipcSharedDir, COMPUTE_ENGINE.getIpcIndex())) {
if (commands.isUp()) {
underTest.changeLogLevel(LoggerLevel.INFO);
}
- @Test
- public void refreshCeWorkerCount_throws_ISE_if_http_error() {
- String message = "blah";
- server.enqueue(new MockResponse().setResponseCode(500).setBody(message));
- // initialize registration of process
- setUpWithHttpUrl(ProcessId.COMPUTE_ENGINE);
-
- expectedException.expect(IllegalStateException.class);
- expectedException.expectMessage("Failed to call HTTP server of process " + ProcessId.COMPUTE_ENGINE);
- expectedException.expectCause(hasType(IOException.class)
- .andMessage(format("Failed to trigger refresh of CE Worker count. Code was '500' and response was 'blah' for url " +
- "'http://%s:%s/refreshWorkerCount'", server.getHostName(), server.getPort())));
-
- underTest.refreshCeWorkerCount();
- }
-
- @Test
- public void refreshCeWorkerCount_does_not_fail_when_http_code_is_200() {
- server.enqueue(new MockResponse().setResponseCode(200));
-
- setUpWithHttpUrl(ProcessId.COMPUTE_ENGINE);
-
- underTest.refreshCeWorkerCount();
- }
-
private void setUpWithHttpUrl(ProcessId processId) {
try (DefaultProcessCommands processCommands = DefaultProcessCommands.secondary(ipcSharedDir, processId.getIpcIndex())) {
processCommands.setUp();