import org.sonar.ce.httpd.CeHttpServer;
import org.sonar.ce.logging.ChangeLogLevelHttpAction;
import org.sonar.ce.systeminfo.SystemInfoHttpAction;
+import org.sonar.ce.taskprocessor.RefreshWorkerCountAction;
import org.sonar.core.platform.Module;
public class CeHttpModule extends Module {
add(
CeHttpServer.class,
SystemInfoHttpAction.class,
- ChangeLogLevelHttpAction.class);
+ ChangeLogLevelHttpAction.class,
+ RefreshWorkerCountAction.class);
}
}
public interface CeConfiguration {
+ /**
+ * Requests {@link CeConfiguration} to refresh its state, if it has any.
+ */
+ void refresh();
+
/**
* The maximum number of workers to process CeTasks concurrently, integer strictly greater than 0.
*/
*/
package org.sonar.ce.configuration;
+import javax.annotation.CheckForNull;
import org.picocontainer.Startable;
import org.sonar.api.utils.MessageException;
-import org.sonar.api.utils.log.Logger;
-import org.sonar.api.utils.log.Loggers;
import static java.lang.String.format;
* {@link CeConfiguration#getQueuePollingDelay()} is called.
*/
public class CeConfigurationImpl implements CeConfiguration, Startable {
- private static final Logger LOG = Loggers.get(CeConfigurationImpl.class);
-
private static final int DEFAULT_WORKER_THREAD_COUNT = 1;
private static final int MAX_WORKER_THREAD_COUNT = 10;
private static final int DEFAULT_WORKER_COUNT = 1;
// 10 minutes
private static final long CANCEL_WORN_OUTS_DELAY = 10;
+ @CheckForNull
+ private final WorkerCountProvider workerCountProvider;
private final int workerThreadCount;
- private final int workerCount;
+ private int workerCount;
public CeConfigurationImpl() {
+ this.workerCountProvider = null;
this.workerThreadCount = DEFAULT_WORKER_THREAD_COUNT;
this.workerCount = DEFAULT_WORKER_COUNT;
}
public CeConfigurationImpl(WorkerCountProvider workerCountProvider) {
+ this.workerCountProvider = workerCountProvider;
+ this.workerThreadCount = MAX_WORKER_THREAD_COUNT;
+ this.workerCount = readWorkerCount(workerCountProvider);
+ }
+
+ private static int readWorkerCount(WorkerCountProvider workerCountProvider) {
int value = workerCountProvider.get();
if (value < DEFAULT_WORKER_COUNT || value > MAX_WORKER_THREAD_COUNT) {
throw parsingError(value);
}
- this.workerThreadCount = MAX_WORKER_THREAD_COUNT;
- this.workerCount = value;
+ return value;
}
private static MessageException parsingError(int value) {
@Override
public void start() {
- if (this.workerCount > 1) {
- LOG.info("Compute Engine will use {} concurrent workers to process tasks", this.workerCount);
- }
+ //
}
@Override
// nothing to do
}
+ @Override
+ public void refresh() {
+ if (workerCountProvider != null) {
+ this.workerCount = readWorkerCount(workerCountProvider);
+ }
+ }
+
@Override
public int getWorkerMaxCount() {
return workerThreadCount;
* task to process.
*/
public interface EnabledCeWorkerController {
+ /**
+ * Requests the {@link EnabledCeWorkerController} to refresh its state, if it has any.
+ */
+ void refresh();
+
/**
* Returns {@code true} if the specified {@link CeWorker} is enabled
*/
package org.sonar.ce.taskprocessor;
import java.util.concurrent.atomic.AtomicInteger;
+import org.sonar.api.utils.log.Loggers;
import org.sonar.ce.configuration.CeConfiguration;
public class EnabledCeWorkerControllerImpl implements EnabledCeWorkerController {
+ private final CeConfiguration ceConfiguration;
private final AtomicInteger workerCount;
public EnabledCeWorkerControllerImpl(CeConfiguration ceConfiguration) {
+ this.ceConfiguration = ceConfiguration;
this.workerCount = new AtomicInteger(ceConfiguration.getWorkerCount());
+ logEnabledWorkerCount();
+ }
+
+ private void logEnabledWorkerCount() {
+ if (workerCount.get() > 1) {
+ Loggers.get(EnabledCeWorkerController.class).info("Compute Engine will use {} concurrent workers to process tasks", this.workerCount);
+ }
+ }
+
+ @Override
+ public void refresh() {
+ ceConfiguration.refresh();
+ this.workerCount.set(ceConfiguration.getWorkerCount());
+ logEnabledWorkerCount();
}
/**
--- /dev/null
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.ce.taskprocessor;
+
+import fi.iki.elonen.NanoHTTPD;
+import org.sonar.ce.httpd.HttpAction;
+
+import static fi.iki.elonen.NanoHTTPD.MIME_PLAINTEXT;
+import static fi.iki.elonen.NanoHTTPD.newFixedLengthResponse;
+import static fi.iki.elonen.NanoHTTPD.Response.Status.METHOD_NOT_ALLOWED;
+import static fi.iki.elonen.NanoHTTPD.Response.Status.OK;
+
+public class RefreshWorkerCountAction implements HttpAction {
+ private static final String PATH = "refreshWorkerCount";
+
+ private final EnabledCeWorkerController enabledCeWorkerController;
+
+ public RefreshWorkerCountAction(EnabledCeWorkerController enabledCeWorkerController) {
+ this.enabledCeWorkerController = enabledCeWorkerController;
+ }
+
+ @Override
+ public void register(ActionRegistry registry) {
+ registry.register(PATH, this);
+ }
+
+ @Override
+ public NanoHTTPD.Response serve(NanoHTTPD.IHTTPSession session) {
+ if (session.getMethod() != NanoHTTPD.Method.POST) {
+ return newFixedLengthResponse(METHOD_NOT_ALLOWED, MIME_PLAINTEXT, null);
+ }
+
+ enabledCeWorkerController.refresh();
+
+ return newFixedLengthResponse(OK, MIME_PLAINTEXT, null);
+ }
+}
package org.sonar.ce.configuration;
import java.util.Random;
+import java.util.stream.IntStream;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
}
@Test
- public void getWorkerThreadCount_returns_1_when_there_is_no_WorkerCountProvider() {
+ public void getWorkerMaxCount_returns_1_when_there_is_no_WorkerCountProvider() {
assertThat(new CeConfigurationImpl().getWorkerMaxCount()).isEqualTo(1);
}
@Test
public void getWorkerCount_returns_value_returned_by_WorkerCountProvider_when_available() {
- int value = 1 + Math.abs(new Random().nextInt(10));
+ int value = randomValidWorkerCount();
workerCountProvider.set(value);
assertThat(new CeConfigurationImpl(workerCountProvider).getWorkerCount()).isEqualTo(value);
}
@Test
- public void getWorkerThreadCount_returns_10_whichever_the_value_returned_by_WorkerCountProvider() {
- int value = 1 + Math.abs(new Random().nextInt(10));
+ public void getWorkerMaxCount_returns_10_whichever_the_value_returned_by_WorkerCountProvider() {
+ int value = randomValidWorkerCount();
workerCountProvider.set(value);
assertThat(new CeConfigurationImpl(workerCountProvider).getWorkerMaxCount()).isEqualTo(10);
private void expectMessageException(int value) {
expectedException.expect(MessageException.class);
expectedException.expectMessage("Worker count '" + value + "' is invalid. " +
- "It must an integer strictly greater than 0 and less or equal to 10");
+ "It must an integer strictly greater than 0 and less or equal to 10");
}
@Test
public void getCleanCeTasksInitialDelay_returns_1() {
assertThat(new CeConfigurationImpl().getCleanCeTasksInitialDelay())
- .isEqualTo(1L);
+ .isEqualTo(1L);
workerCountProvider.set(1);
assertThat(new CeConfigurationImpl(workerCountProvider).getCleanCeTasksInitialDelay())
- .isEqualTo(1L);
+ .isEqualTo(1L);
}
@Test
public void getCleanCeTasksDelay_returns_10() {
assertThat(new CeConfigurationImpl().getCleanCeTasksDelay())
- .isEqualTo(10L);
+ .isEqualTo(10L);
workerCountProvider.set(1);
assertThat(new CeConfigurationImpl(workerCountProvider).getCleanCeTasksDelay())
- .isEqualTo(10L);
+ .isEqualTo(10L);
+ }
+
+ @Test
+ public void refresh_does_not_change_any_value_when_there_is_no_WorkerCountProvider() {
+ CeConfigurationImpl underTest = new CeConfigurationImpl();
+ long cleanCeTasksInitialDelay = underTest.getCleanCeTasksInitialDelay();
+ long cleanCeTasksDelay = underTest.getCleanCeTasksDelay();
+ long queuePollingDelay = underTest.getQueuePollingDelay();
+ int workerThreadCount = underTest.getWorkerMaxCount();
+ int workerCount = underTest.getWorkerCount();
+
+ IntStream.range(0, 1 + abs(new Random().nextInt(10)))
+ .forEach(ignored -> {
+ underTest.refresh();
+ assertThat(underTest.getCleanCeTasksInitialDelay()).isEqualTo(cleanCeTasksInitialDelay);
+ assertThat(underTest.getCleanCeTasksDelay()).isEqualTo(cleanCeTasksDelay);
+ assertThat(underTest.getQueuePollingDelay()).isEqualTo(queuePollingDelay);
+ assertThat(underTest.getWorkerMaxCount()).isEqualTo(workerThreadCount);
+ assertThat(underTest.getWorkerCount()).isEqualTo(workerCount);
+ });
+ }
+
+ @Test
+ public void refresh_updates_only_workerCount_from_WorkerCountProvider_when_there_WorkerCountProvider_is_present() {
+ workerCountProvider.set(randomValidWorkerCount());
+ CeConfigurationImpl underTest = new CeConfigurationImpl(workerCountProvider);
+ long cleanCeTasksInitialDelay = underTest.getCleanCeTasksInitialDelay();
+ long cleanCeTasksDelay = underTest.getCleanCeTasksDelay();
+ long queuePollingDelay = underTest.getQueuePollingDelay();
+ int workerThreadCount = underTest.getWorkerMaxCount();
+
+ IntStream.range(0, 1 + abs(new Random().nextInt(10)))
+ .forEach(ignored -> {
+ int newWorkerCount = randomValidWorkerCount();
+ workerCountProvider.set(newWorkerCount);
+
+ underTest.refresh();
+
+ assertThat(underTest.getCleanCeTasksInitialDelay()).isEqualTo(cleanCeTasksInitialDelay);
+ assertThat(underTest.getCleanCeTasksDelay()).isEqualTo(cleanCeTasksDelay);
+ assertThat(underTest.getQueuePollingDelay()).isEqualTo(queuePollingDelay);
+ assertThat(underTest.getWorkerMaxCount()).isEqualTo(workerThreadCount);
+ assertThat(underTest.getWorkerCount()).isEqualTo(newWorkerCount);
+ });
}
private static final class SimpleWorkerCountProvider implements WorkerCountProvider {
return value;
}
}
+
+ private static int randomValidWorkerCount() {
+ return 1 + Math.abs(new Random().nextInt(10));
+ }
}
*/
package org.sonar.ce.configuration;
+import java.util.function.Consumer;
import org.junit.rules.ExternalResource;
import static com.google.common.base.Preconditions.checkArgument;
private long queuePollingDelay = 2 * 1000L;
private long cancelWornOutsInitialDelay = 1L;
private long cancelWornOutsDelay = 10L;
+ private Consumer<CeConfigurationRule> refreshCallHook;
+
+ @Override
+ public void refresh() {
+ if (this.refreshCallHook != null) {
+ this.refreshCallHook.accept(this);
+ }
+ }
+
+ public void setRefreshCallHook(Consumer<CeConfigurationRule> refreshCallHook) {
+ this.refreshCallHook = refreshCallHook;
+ }
@Override
public int getWorkerMaxCount() {
+ 73 // level 4
+ 4 // content of CeConfigurationModule
+ 4 // content of CeQueueModule
- + 3 // content of CeHttpModule
+ + 4 // content of CeHttpModule
+ 3 // content of CeTaskCommonsModule
+ 4 // content of ProjectAnalysisTaskModule
+ 5 // content of CeTaskProcessorModule
private ChangeLogLevelHttpAction underTest = new ChangeLogLevelHttpAction(serverLogging, database, ceProcessLogging);
@Test
- public void register_to_path_systemInfo() {
+ public void register_to_path_changeLogLevel() {
HttpAction.ActionRegistry actionRegistry = mock(HttpAction.ActionRegistry.class);
underTest.register(actionRegistry);
}
private static class DumbCeConfiguration implements CeConfiguration {
+ @Override
+ public void refresh() {
+ throw new UnsupportedOperationException("Refresh is not implemented");
+ }
+
@Override
public int getWorkerMaxCount() {
return WORKER_MAX_COUNT;
import java.util.Random;
import org.junit.Rule;
import org.junit.Test;
+import org.sonar.api.utils.log.LogTester;
+import org.sonar.api.utils.log.LoggerLevel;
import org.sonar.ce.configuration.CeConfigurationRule;
import static org.assertj.core.api.Assertions.assertThat;
public class EnabledCeWorkerControllerImplTest {
private Random random = new Random();
/** 1 <= workerCount <= 5 */
- private int workerCount = 1 + random.nextInt(5);
+ private int randomWorkerCount = 1 + random.nextInt(5);
@Rule
public CeConfigurationRule ceConfigurationRule = new CeConfigurationRule()
- .setWorkerCount(workerCount);
+ .setWorkerCount(randomWorkerCount);
+ @Rule
+ public LogTester logTester = new LogTester();
private CeWorker ceWorker = mock(CeWorker.class);
private EnabledCeWorkerControllerImpl underTest = new EnabledCeWorkerControllerImpl(ceConfigurationRule);
@Test
public void isEnabled_returns_true_if_worker_ordinal_is_less_than_CeConfiguration_workerCount() {
- int ordinal = workerCount + Math.min(-1, -random.nextInt(workerCount));
+ int ordinal = randomWorkerCount + Math.min(-1, -random.nextInt(randomWorkerCount));
when(ceWorker.getOrdinal()).thenReturn(ordinal);
assertThat(underTest.isEnabled(ceWorker))
- .as("For ordinal " + ordinal + " and workerCount " + workerCount)
+ .as("For ordinal " + ordinal + " and workerCount " + randomWorkerCount)
.isTrue();
}
@Test
public void isEnabled_returns_false_if_worker_ordinal_is_equal_to_CeConfiguration_workerCount() {
- when(ceWorker.getOrdinal()).thenReturn(workerCount);
+ when(ceWorker.getOrdinal()).thenReturn(randomWorkerCount);
assertThat(underTest.isEnabled(ceWorker)).isFalse();
}
when(ceWorker.getOrdinal()).thenReturn(ordinal);
assertThat(underTest.isEnabled(ceWorker))
- .as("For invalid ordinal " + ordinal + " and workerCount " + workerCount)
+ .as("For invalid ordinal " + ordinal + " and workerCount " + randomWorkerCount)
.isTrue();
}
+ @Test
+ public void constructor_writes_no_info_log_if_workerCount_is_1() {
+ ceConfigurationRule.setWorkerCount(1);
+ logTester.clear();
+
+ new EnabledCeWorkerControllerImpl(ceConfigurationRule);
+
+ assertThat(logTester.logs()).isEmpty();
+ }
+
+ @Test
+ public void constructor_writes_info_log_if_workerCount_is_greater_than_1() {
+ int newWorkerCount = randomWorkerCount + 1;
+ ceConfigurationRule.setWorkerCount(newWorkerCount);
+ logTester.clear();
+
+ new EnabledCeWorkerControllerImpl(ceConfigurationRule);
+
+ verifyInfoLog(newWorkerCount);
+ }
+
@Test
public void workerCount_is_loaded_in_constructor() {
- when(ceWorker.getOrdinal()).thenReturn(workerCount);
+ when(ceWorker.getOrdinal()).thenReturn(randomWorkerCount);
assertThat(underTest.isEnabled(ceWorker)).isFalse();
- ceConfigurationRule.setWorkerCount(workerCount + 1);
+ ceConfigurationRule.setWorkerCount(randomWorkerCount + 1);
+ assertThat(underTest.isEnabled(ceWorker)).isFalse();
+ }
+
+ @Test
+ public void refresh_reloads_workerCount() {
+ when(ceWorker.getOrdinal()).thenReturn(randomWorkerCount);
assertThat(underTest.isEnabled(ceWorker)).isFalse();
+ ceConfigurationRule.setRefreshCallHook((rule) -> rule.setWorkerCount(randomWorkerCount + 1));
+
+ underTest.refresh();
+
+ assertThat(underTest.isEnabled(ceWorker)).isTrue();
+ }
+
+ @Test
+ public void refresh_writes_info_log_if_workerCount_is_greater_than_1() {
+ logTester.clear();
+ int newWorkerCount = randomWorkerCount + 1;
+ ceConfigurationRule.setRefreshCallHook((rule) -> rule.setWorkerCount(newWorkerCount));
+
+ underTest.refresh();
+
+ verifyInfoLog(newWorkerCount);
+ }
+
+ @Test
+ public void refresh_writes_no_info_log_if_workerCount_is_1() {
+ logTester.clear();
+ ceConfigurationRule.setRefreshCallHook((rule) -> rule.setWorkerCount(1));
+
+ underTest.refresh();
+
+ assertThat(logTester.logs()).isEmpty();
+ }
+
+ private void verifyInfoLog(int workerCount) {
+ assertThat(logTester.logs()).hasSize(1);
+ assertThat(logTester.logs(LoggerLevel.INFO))
+ .containsOnly("Compute Engine will use " + workerCount + " concurrent workers to process tasks");
}
}
--- /dev/null
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.ce.taskprocessor;
+
+import fi.iki.elonen.NanoHTTPD;
+import org.junit.Test;
+import org.sonar.ce.httpd.HttpAction;
+
+import static fi.iki.elonen.NanoHTTPD.Method.GET;
+import static fi.iki.elonen.NanoHTTPD.Method.POST;
+import static fi.iki.elonen.NanoHTTPD.Response.Status.METHOD_NOT_ALLOWED;
+import static fi.iki.elonen.NanoHTTPD.Response.Status.OK;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.sonar.ce.httpd.CeHttpUtils.createHttpSession;
+
+public class RefreshWorkerCountActionTest {
+ private EnabledCeWorkerController enabledCeWorkerController = mock(EnabledCeWorkerController.class);
+ private RefreshWorkerCountAction underTest = new RefreshWorkerCountAction(enabledCeWorkerController);
+
+ @Test
+ public void register_to_path_changeLogLevel() {
+ HttpAction.ActionRegistry actionRegistry = mock(HttpAction.ActionRegistry.class);
+
+ underTest.register(actionRegistry);
+
+ verify(actionRegistry).register("refreshWorkerCount", underTest);
+ }
+
+ @Test
+ public void serves_METHOD_NOT_ALLOWED_error_when_method_is_not_POST() {
+ NanoHTTPD.Response response = underTest.serve(createHttpSession(GET));
+
+ assertThat(response.getStatus()).isEqualTo(METHOD_NOT_ALLOWED);
+ verifyZeroInteractions(enabledCeWorkerController);
+ }
+
+ @Test
+ public void call_EnabledCeWorkerController_refresh_on_POST() {
+ NanoHTTPD.Response response = underTest.serve(createHttpSession(POST));
+
+ assertThat(response.getStatus()).isEqualTo(OK);
+ verify(enabledCeWorkerController).refresh();
+ verifyNoMoreInteractions(enabledCeWorkerController);
+ }
+}
}
}
+ public void refreshCeWorkerCount() {
+ call(RefreshCeWorkerCountActionClient.INSTANCE);
+ }
+
+ private enum RefreshCeWorkerCountActionClient implements ActionClient<Void> {
+ INSTANCE;
+
+ @Override
+ public String getPath() {
+ return "refreshWorkerCount";
+ }
+
+ @Override
+ public Void getDefault() {
+ return null;
+ }
+
+ @Override
+ public Void call(String url) throws Exception {
+ okhttp3.Request request = new okhttp3.Request.Builder()
+ .post(RequestBody.create(null, new byte[0]))
+ .url(url)
+ .build();
+ okhttp3.Response response = new OkHttpClient().newCall(request).execute();
+ if (response.code() != 200) {
+ throw new IOException(
+ String.format(
+ "Failed to trigger refresh of CE Worker count. Code was '%s' and response was '%s' for url '%s'",
+ response.code(),
+ response.body().string(),
+ url));
+ }
+ return null;
+ }
+ }
+
private <T> T call(ActionClient<T> actionClient) {
try (DefaultProcessCommands commands = DefaultProcessCommands.secondary(ipcSharedDir, COMPUTE_ENGINE.getIpcIndex())) {
if (commands.isUp()) {
underTest.changeLogLevel(LoggerLevel.INFO);
}
+ @Test
+ public void refreshCeWorkerCount_throws_ISE_if_http_error() {
+ String message = "blah";
+ server.enqueue(new MockResponse().setResponseCode(500).setBody(message));
+ // initialize registration of process
+ setUpWithHttpUrl(ProcessId.COMPUTE_ENGINE);
+
+ expectedException.expect(IllegalStateException.class);
+ expectedException.expectMessage("Failed to call HTTP server of process " + ProcessId.COMPUTE_ENGINE);
+ expectedException.expectCause(hasType(IOException.class)
+ .andMessage(format("Failed to trigger refresh of CE Worker count. Code was '500' and response was 'blah' for url " +
+ "'http://%s:%s/refreshWorkerCount'", server.getHostName(), server.getPort())));
+
+ underTest.refreshCeWorkerCount();
+ }
+
+ @Test
+ public void refreshCeWorkerCount_does_not_fail_when_http_code_is_200() {
+ server.enqueue(new MockResponse().setResponseCode(200));
+
+ setUpWithHttpUrl(ProcessId.COMPUTE_ENGINE);
+
+ underTest.refreshCeWorkerCount();
+ }
+
private void setUpWithHttpUrl(ProcessId processId) {
try (DefaultProcessCommands processCommands = DefaultProcessCommands.secondary(ipcSharedDir, processId.getIpcIndex())) {
processCommands.setUp();