import java.util.List;
import org.picocontainer.Startable;
import org.sonar.api.ce.ComputeEngineSide;
+import org.sonar.api.config.Configuration;
import org.sonar.api.platform.ServerUpgradeStatus;
import org.sonar.api.utils.log.Logger;
import org.sonar.api.utils.log.Loggers;
import org.sonar.db.DbClient;
import org.sonar.db.DbSession;
+import org.sonar.process.ProcessProperties;
/**
- * Cleans-up the Compute Engine queue and resets the JMX counters.
+ * Cleans-up the Compute Engine queue.
* CE workers must not be started before execution of this class.
*/
@ComputeEngineSide
private final DbClient dbClient;
private final ServerUpgradeStatus serverUpgradeStatus;
private final InternalCeQueue queue;
+ private final Configuration configuration;
- public CeQueueCleaner(DbClient dbClient, ServerUpgradeStatus serverUpgradeStatus, InternalCeQueue queue) {
+ public CeQueueCleaner(DbClient dbClient, ServerUpgradeStatus serverUpgradeStatus, InternalCeQueue queue, Configuration configuration) {
this.dbClient = dbClient;
this.serverUpgradeStatus = serverUpgradeStatus;
this.queue = queue;
+ this.configuration = configuration;
}
@Override
public void start() {
- if (serverUpgradeStatus.isUpgraded()) {
+ if (serverUpgradeStatus.isUpgraded() && !isBlueGreenDeployment()) {
cleanOnUpgrade();
} else {
- try (DbSession dbSession = dbClient.openSession(false)) {
- verifyConsistency(dbSession);
- }
+ cleanUpTaskInputOrphans();
}
}
+ private boolean isBlueGreenDeployment() {
+ return configuration.getBoolean(ProcessProperties.Property.BLUE_GREEN_ENABLED.getKey()).orElse(false);
+ }
+
private void cleanOnUpgrade() {
// we assume that pending tasks are not compatible with the new version
// and can't be processed
queue.clear();
}
- private void verifyConsistency(DbSession dbSession) {
- // Reports that have been processed are not kept in database yet.
- // They are supposed to be systematically dropped.
- // Let's clean-up orphans if any.
- List<String> uuids = dbClient.ceTaskInputDao().selectUuidsNotInQueue(dbSession);
- dbClient.ceTaskInputDao().deleteByUuids(dbSession, uuids);
- dbSession.commit();
+ private void cleanUpTaskInputOrphans() {
+ try (DbSession dbSession = dbClient.openSession(false)) {
+ // Reports that have been processed are not kept in database yet.
+ // They are supposed to be systematically dropped.
+ // Let's clean-up orphans if any.
+ List<String> uuids = dbClient.ceTaskInputDao().selectUuidsNotInQueue(dbSession);
+ dbClient.ceTaskInputDao().deleteByUuids(dbSession, uuids);
+ dbSession.commit();
+ }
}
@Override
*/
package org.sonar.ce.queue;
-import java.io.IOException;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.junit.Rule;
import org.junit.Test;
+import org.sonar.api.config.internal.MapSettings;
import org.sonar.api.platform.ServerUpgradeStatus;
import org.sonar.api.utils.System2;
import org.sonar.db.DbTester;
import org.sonar.db.ce.CeQueueDto;
import org.sonar.db.ce.CeTaskInputDao;
import org.sonar.db.ce.CeTaskTypes;
+import org.sonar.process.ProcessProperties;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
private ServerUpgradeStatus serverUpgradeStatus = mock(ServerUpgradeStatus.class);
private InternalCeQueue queue = mock(InternalCeQueue.class);
- private CeQueueCleaner underTest = new CeQueueCleaner(dbTester.getDbClient(), serverUpgradeStatus, queue);
+ private MapSettings settings = new MapSettings();
@Test
- public void start_does_not_reset_in_progress_tasks_to_pending() throws IOException {
+ public void start_does_not_reset_in_progress_tasks_to_pending() {
insertInQueue("TASK_1", CeQueueDto.Status.PENDING);
insertInQueue("TASK_2", CeQueueDto.Status.IN_PROGRESS);
- underTest.start();
+ runCleaner();
assertThat(dbTester.getDbClient().ceQueueDao().countByStatus(dbTester.getSession(), CeQueueDto.Status.PENDING)).isEqualTo(1);
assertThat(dbTester.getDbClient().ceQueueDao().countByStatus(dbTester.getSession(), CeQueueDto.Status.IN_PROGRESS)).isEqualTo(1);
public void start_clears_queue_if_version_upgrade() {
when(serverUpgradeStatus.isUpgraded()).thenReturn(true);
- underTest.start();
+ runCleaner();
verify(queue).clear();
}
@Test
- public void start_deletes_orphan_report_files() throws Exception {
+ public void start_does_not_clear_queue_if_version_upgrade_but_blue_green_deployment() {
+ when(serverUpgradeStatus.isUpgraded()).thenReturn(true);
+ settings.setProperty(ProcessProperties.Property.BLUE_GREEN_ENABLED.getKey(), true);
+
+ runCleaner();
+
+ verify(queue, never()).clear();
+ }
+
+ @Test
+ public void start_deletes_orphan_report_files() {
// analysis reports are persisted but the associated
// task is not in the queue
insertInQueue("TASK_1", CeQueueDto.Status.PENDING);
insertTaskData("TASK_1");
insertTaskData("TASK_2");
- underTest.start();
+ runCleaner();
CeTaskInputDao dataDao = dbTester.getDbClient().ceTaskInputDao();
Optional<CeTaskInputDao.DataStream> task1Data = dataDao.selectData(dbTester.getSession(), "TASK_1");
dbTester.getDbClient().ceTaskInputDao().insert(dbTester.getSession(), taskUuid, IOUtils.toInputStream("{binary}"));
dbTester.getSession().commit();
}
+
+ private void runCleaner() {
+ CeQueueCleaner cleaner = new CeQueueCleaner(dbTester.getDbClient(), serverUpgradeStatus, queue, settings.asConfig());
+ cleaner.start();
+ }
}