@@ -19,12 +19,13 @@ | |||
*/ | |||
package org.sonar.ce; | |||
import org.sonar.ce.container.ComputeEngineStatus; | |||
import org.sonar.ce.container.ComputeEngineContainer; | |||
import org.sonar.process.Props; | |||
import static com.google.common.base.Preconditions.checkState; | |||
public class ComputeEngineImpl implements ComputeEngine { | |||
public class ComputeEngineImpl implements ComputeEngine, ComputeEngineStatus { | |||
private final Props props; | |||
private final ComputeEngineContainer computeEngineContainer; | |||
@@ -33,6 +34,7 @@ public class ComputeEngineImpl implements ComputeEngine { | |||
public ComputeEngineImpl(Props props, ComputeEngineContainer computeEngineContainer) { | |||
this.props = props; | |||
this.computeEngineContainer = computeEngineContainer; | |||
computeEngineContainer.setComputeEngineStatus(this); | |||
} | |||
@Override | |||
@@ -63,7 +65,8 @@ public class ComputeEngineImpl implements ComputeEngine { | |||
checkState(currentStatus.ordinal() <= Status.STOPPING.ordinal(), "shutdown() can not be called multiple times"); | |||
} | |||
private enum Status { | |||
INIT, STARTING, STARTED, STOPPING, STOPPED | |||
@Override | |||
public Status getStatus() { | |||
return status; | |||
} | |||
} |
@@ -22,6 +22,8 @@ package org.sonar.ce.container; | |||
import org.sonar.process.Props; | |||
public interface ComputeEngineContainer { | |||
void setComputeEngineStatus(ComputeEngineStatus computeEngineStatus); | |||
ComputeEngineContainer start(Props props); | |||
ComputeEngineContainer stop(); |
@@ -148,20 +148,29 @@ import org.sonar.server.view.index.ViewIndex; | |||
import org.sonar.server.view.index.ViewIndexer; | |||
import org.sonarqube.ws.Rules; | |||
import static java.util.Objects.requireNonNull; | |||
public class ComputeEngineContainerImpl implements ComputeEngineContainer { | |||
private ComputeEngineStatus computeEngineStatus; | |||
@CheckForNull | |||
private ComponentContainer level1; | |||
@CheckForNull | |||
private ComponentContainer level4; | |||
@Override | |||
public void setComputeEngineStatus(ComputeEngineStatus computeEngineStatus) { | |||
this.computeEngineStatus = computeEngineStatus; | |||
} | |||
@Override | |||
public ComputeEngineContainer start(Props props) { | |||
this.level1 = new ComponentContainer(); | |||
this.level1 | |||
.add(props.rawProperties()) | |||
.add(level1Components()) | |||
.add(toArray(CorePropertyDefinitions.all())); | |||
.add(toArray(CorePropertyDefinitions.all())) | |||
.add(requireNonNull(computeEngineStatus)); | |||
configureFromModules(this.level1); | |||
this.level1.startComponents(); | |||
@@ -0,0 +1,32 @@ | |||
/* | |||
* SonarQube | |||
* Copyright (C) 2009-2017 SonarSource SA | |||
* mailto:info AT sonarsource DOT com | |||
* | |||
* This program is free software; you can redistribute it and/or | |||
* modify it under the terms of the GNU Lesser General Public | |||
* License as published by the Free Software Foundation; either | |||
* version 3 of the License, or (at your option) any later version. | |||
* | |||
* This program is distributed in the hope that it will be useful, | |||
* but WITHOUT ANY WARRANTY; without even the implied warranty of | |||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |||
* Lesser General Public License for more details. | |||
* | |||
* You should have received a copy of the GNU Lesser General Public License | |||
* along with this program; if not, write to the Free Software Foundation, | |||
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. | |||
*/ | |||
package org.sonar.ce.container; | |||
public interface ComputeEngineStatus { | |||
/** | |||
* This status will be consumed by multiple processes, hence the implementation of the method must be threadsafe | |||
*/ | |||
Status getStatus(); | |||
enum Status { | |||
INIT, STARTING, STARTED, STOPPING, STOPPED; | |||
} | |||
} |
@@ -33,6 +33,7 @@ import org.apache.log4j.Logger; | |||
import org.sonar.api.ce.ComputeEngineSide; | |||
import org.sonar.api.utils.System2; | |||
import org.sonar.api.utils.log.Loggers; | |||
import org.sonar.ce.container.ComputeEngineStatus; | |||
import org.sonar.ce.monitoring.CEQueueStatus; | |||
import org.sonar.core.util.UuidFactory; | |||
import org.sonar.db.DbClient; | |||
@@ -54,23 +55,25 @@ public class InternalCeQueueImpl extends CeQueueImpl implements InternalCeQueue | |||
private final System2 system2; | |||
private final DbClient dbClient; | |||
private final CEQueueStatus queueStatus; | |||
private final ComputeEngineStatus computeEngineStatus; | |||
// state | |||
private AtomicBoolean peekPaused = new AtomicBoolean(false); | |||
public InternalCeQueueImpl(System2 system2, DbClient dbClient, UuidFactory uuidFactory, CEQueueStatus queueStatus, | |||
DefaultOrganizationProvider defaultOrganizationProvider) { | |||
DefaultOrganizationProvider defaultOrganizationProvider, ComputeEngineStatus computeEngineStatus) { | |||
super(dbClient, uuidFactory, defaultOrganizationProvider); | |||
this.system2 = system2; | |||
this.dbClient = dbClient; | |||
this.queueStatus = queueStatus; | |||
this.computeEngineStatus = computeEngineStatus; | |||
} | |||
@Override | |||
public Optional<CeTask> peek(String workerUuid) { | |||
requireNonNull(workerUuid, "workerUuid can't be null"); | |||
if (peekPaused.get()) { | |||
if (peekPaused.get() || computeEngineStatus.getStatus() != ComputeEngineStatus.Status.STARTED) { | |||
return Optional.empty(); | |||
} | |||
try (DbSession dbSession = dbClient.openSession(false)) { |
@@ -23,6 +23,7 @@ import java.util.Properties; | |||
import org.junit.Rule; | |||
import org.junit.Test; | |||
import org.junit.rules.ExpectedException; | |||
import org.sonar.ce.container.ComputeEngineStatus; | |||
import org.sonar.ce.container.ComputeEngineContainer; | |||
import org.sonar.process.Props; | |||
@@ -63,6 +64,10 @@ public class ComputeEngineImplTest { | |||
} | |||
private static class NoOpComputeEngineContainer implements ComputeEngineContainer { | |||
@Override | |||
public void setComputeEngineStatus(ComputeEngineStatus computeEngineStatus) { | |||
} | |||
@Override | |||
public ComputeEngineContainer start(Props props) { | |||
return this; |
@@ -27,6 +27,7 @@ import java.util.Date; | |||
import java.util.Properties; | |||
import java.util.stream.Collectors; | |||
import org.apache.commons.dbcp.BasicDataSource; | |||
import org.junit.Before; | |||
import org.junit.Rule; | |||
import org.junit.Test; | |||
import org.junit.rules.TemporaryFolder; | |||
@@ -49,6 +50,7 @@ import org.sonar.process.Props; | |||
import static java.lang.String.valueOf; | |||
import static org.assertj.core.api.Assertions.assertThat; | |||
import static org.mockito.Mockito.mock; | |||
import static org.sonar.process.ProcessEntryPoint.PROPERTY_PROCESS_INDEX; | |||
import static org.sonar.process.ProcessEntryPoint.PROPERTY_SHARED_PATH; | |||
import static org.sonar.process.ProcessProperties.PATH_DATA; | |||
@@ -65,7 +67,13 @@ public class ComputeEngineContainerImplTest { | |||
@Rule | |||
public DbTester dbTester = DbTester.create(System2.INSTANCE); | |||
private ComputeEngineContainerImpl underTest = new ComputeEngineContainerImpl(); | |||
private ComputeEngineContainerImpl underTest; | |||
@Before | |||
public void setUp() throws Exception { | |||
underTest = new ComputeEngineContainerImpl(); | |||
underTest.setComputeEngineStatus(mock(ComputeEngineStatus.class)); | |||
} | |||
@Test | |||
public void constructor_does_not_create_container() { | |||
@@ -139,6 +147,7 @@ public class ComputeEngineContainerImplTest { | |||
+ 47 // content of DaoModule | |||
+ 3 // content of EsSearchModule | |||
+ 61 // content of CorePropertyDefinitions | |||
+ 1 // StopFlagContainer | |||
); | |||
assertThat( | |||
picoContainer.getComponentAdapters().stream() |
@@ -33,6 +33,7 @@ import org.junit.Test; | |||
import org.junit.rules.ExpectedException; | |||
import org.sonar.api.utils.System2; | |||
import org.sonar.api.utils.internal.AlwaysIncreasingSystem2; | |||
import org.sonar.ce.container.ComputeEngineStatus; | |||
import org.sonar.ce.monitoring.CEQueueStatus; | |||
import org.sonar.ce.monitoring.CEQueueStatusImpl; | |||
import org.sonar.core.util.UuidFactory; | |||
@@ -53,6 +54,8 @@ import static java.util.Arrays.asList; | |||
import static org.assertj.core.api.Assertions.assertThat; | |||
import static org.mockito.Mockito.mock; | |||
import static org.mockito.Mockito.when; | |||
import static org.sonar.ce.container.ComputeEngineStatus.Status.STARTED; | |||
import static org.sonar.ce.container.ComputeEngineStatus.Status.STOPPING; | |||
public class InternalCeQueueImplTest { | |||
@@ -72,7 +75,8 @@ public class InternalCeQueueImplTest { | |||
private UuidFactory uuidFactory = UuidFactoryImpl.INSTANCE; | |||
private CEQueueStatus queueStatus = new CEQueueStatusImpl(dbTester.getDbClient()); | |||
private DefaultOrganizationProvider defaultOrganizationProvider = mock(DefaultOrganizationProvider.class); | |||
private InternalCeQueue underTest = new InternalCeQueueImpl(system2, dbTester.getDbClient(), uuidFactory, queueStatus, defaultOrganizationProvider); | |||
private ComputeEngineStatus computeEngineStatus = mock(ComputeEngineStatus.class); | |||
private InternalCeQueue underTest = new InternalCeQueueImpl(system2, dbTester.getDbClient(), uuidFactory, queueStatus, defaultOrganizationProvider, computeEngineStatus); | |||
@Before | |||
public void setUp() throws Exception { | |||
@@ -84,6 +88,7 @@ public class InternalCeQueueImplTest { | |||
.setCreatedAt(defaultOrganization.getCreatedAt()) | |||
.setUpdatedAt(defaultOrganization.getUpdatedAt()) | |||
.build()); | |||
when(computeEngineStatus.getStatus()).thenReturn(STARTED); | |||
} | |||
@Test | |||
@@ -303,6 +308,15 @@ public class InternalCeQueueImplTest { | |||
assertThat(peek.isPresent()).isFalse(); | |||
} | |||
@Test | |||
public void peek_nothing_if_application_status_stopping() throws Exception { | |||
submit(CeTaskTypes.REPORT, "PROJECT_1"); | |||
when(computeEngineStatus.getStatus()).thenReturn(STOPPING); | |||
Optional<CeTask> peek = underTest.peek(WORKER_UUID_1); | |||
assertThat(peek.isPresent()).isFalse(); | |||
} | |||
@Test | |||
public void peek_peeks_pending_tasks_with_executionCount_equal_to_0_and_increases_it() { | |||
dbTester.getDbClient().ceQueueDao().insert(session, new CeQueueDto() |