import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.sonar.ce.queue.CeQueue.SubmitOption.UNIQUE_QUEUE_PER_MAIN_COMPONENT;
+import static org.sonar.ce.queue.CeQueue.SubmitOption.UNIQUE_QUEUE_PER_TASK_TYPE;
public class CeQueueImplIT {
.contains(task.getUuid());
}
+ @Test
+ public void submit_with_UNIQUE_QUEUE_PER_TASK_TYPE_does_not_create_task_when_there_is_a_task_with_the_same_type() {
+ String mainComponentUuid = randomAlphabetic(5);
+ CeTaskSubmit taskSubmit = createTaskSubmit("some type", newComponent(mainComponentUuid), null);
+ String[] uuids = IntStream.range(0, 2 + new Random().nextInt(5))
+ .mapToObj(i -> insertPendingInQueue(newComponent(mainComponentUuid)))
+ .map(CeQueueDto::getUuid)
+ .toArray(String[]::new);
+ Optional<CeTask> task = underTest.submit(taskSubmit, UNIQUE_QUEUE_PER_TASK_TYPE);
+
+ assertThat(task).isEmpty();
+ assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession()))
+ .extracting(CeQueueDto::getUuid)
+ .containsOnly(uuids);
+ }
+
@Test
public void massSubmit_returns_tasks_for_each_CeTaskSubmit_populated_from_CeTaskSubmit_and_creates_CeQueue_row_for_each() {
String mainComponentUuid = randomAlphabetic(10);
import org.sonar.db.ce.CeActivityDto;
import org.sonar.db.ce.CeQueueDto;
import org.sonar.db.ce.CeTaskCharacteristicDto;
+import org.sonar.db.ce.CeTaskQuery;
import org.sonar.db.ce.DeleteIf;
import org.sonar.db.component.ComponentDto;
import org.sonar.db.user.UserDto;
import static com.google.common.base.Preconditions.checkState;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singleton;
-import static java.util.Optional.of;
import static java.util.Optional.ofNullable;
import static org.sonar.ce.queue.CeQueue.SubmitOption.UNIQUE_QUEUE_PER_MAIN_COMPONENT;
import static org.sonar.core.util.stream.MoreCollectors.toEnumSet;
private Optional<CeTask> submit(CeTaskSubmit submission, EnumSet<SubmitOption> submitOptions) {
try (DbSession dbSession = dbClient.openSession(false)) {
- if (submitOptions.contains(UNIQUE_QUEUE_PER_MAIN_COMPONENT)
- && submission.getComponent()
- .map(component -> dbClient.ceQueueDao().countByStatusAndMainComponentUuid(dbSession, PENDING, component.getMainComponentUuid()) > 0)
- .orElse(false)) {
- return Optional.empty();
- }
- CeQueueDto taskDto = addToQueueInDb(dbSession, submission);
+ Optional<CeTask> ceTask = submit(dbSession, submission, submitOptions);
dbSession.commit();
+ return ceTask;
+ }
+ }
- Map<String, ComponentDto> componentsByUuid = loadComponentDtos(dbSession, taskDto);
- if (componentsByUuid.isEmpty()) {
- return of(convertToTask(dbSession, taskDto, submission.getCharacteristics(), null, null));
+ private Optional<CeTask> submit(DbSession dbSession, CeTaskSubmit submission, EnumSet<SubmitOption> submitOptions) {
+ CeTaskQuery query = new CeTaskQuery();
+ for (SubmitOption option : submitOptions) {
+ switch (option) {
+ case UNIQUE_QUEUE_PER_MAIN_COMPONENT -> submission.getComponent()
+ .flatMap(component -> Optional.ofNullable(component.getMainComponentUuid()))
+ .ifPresent(mainComponentUuid -> query.setMainComponentUuid(mainComponentUuid).setStatuses(List.of(PENDING.name())));
+ case UNIQUE_QUEUE_PER_TASK_TYPE -> query.setType(submission.getType());
}
+ }
+
+ boolean queryNonEmpty = query.getMainComponentUuids() != null || query.getStatuses() != null || query.getType() != null;
+ if (queryNonEmpty && dbClient.ceQueueDao().countByQuery(dbSession, query) > 0) {
+ return Optional.empty();
+ }
+ CeQueueDto inserted = addToQueueInDb(dbSession, submission);
+ return Optional.of(convertQueueDtoToTask(dbSession, inserted, submission));
+ }
- return of(convertToTask(dbSession, taskDto, submission.getCharacteristics(),
- ofNullable(taskDto.getComponentUuid()).map(componentsByUuid::get).orElse(null),
- ofNullable(taskDto.getMainComponentUuid()).map(componentsByUuid::get).orElse(null)));
+ private CeTask convertQueueDtoToTask(DbSession dbSession, CeQueueDto queueDto, CeTaskSubmit submission) {
+ Map<String, ComponentDto> componentsByUuid = loadComponentDtos(dbSession, queueDto);
+ if (componentsByUuid.isEmpty()) {
+ return convertToTask(dbSession, queueDto, submission.getCharacteristics(), null, null);
}
+ return convertToTask(dbSession, queueDto, submission.getCharacteristics(),
+ ofNullable(queueDto.getComponentUuid()).map(componentsByUuid::get).orElse(null),
+ ofNullable(queueDto.getMainComponentUuid()).map(componentsByUuid::get).orElse(null));
}
Map<String, ComponentDto> loadComponentDtos(DbSession dbSession, CeQueueDto taskDto) {
return Arrays.stream(options).collect(toEnumSet(SubmitOption.class));
}
- private CeQueueDto addToQueueInDb(DbSession dbSession, CeTaskSubmit submission) {
+ private void insertCharacteristics(DbSession dbSession, CeTaskSubmit submission) {
for (Map.Entry<String, String> characteristic : submission.getCharacteristics().entrySet()) {
CeTaskCharacteristicDto characteristicDto = new CeTaskCharacteristicDto();
characteristicDto.setUuid(uuidFactory.create());
characteristicDto.setValue(characteristic.getValue());
dbClient.ceTaskCharacteristicsDao().insert(dbSession, characteristicDto);
}
+ }
+
+ private CeQueueDto addToQueueInDb(DbSession dbSession, CeTaskSubmit submission) {
+ insertCharacteristics(dbSession, submission);
CeQueueDto dto = new CeQueueDto();
dto.setUuid(submission.getUuid());
updateExecutionFields(activityDto);
remove(dbSession, task, activityDto);
}
-
+
protected long updateExecutionFields(CeActivityDto activityDto) {
Long startedAt = activityDto.getStartedAt();
if (startedAt == null) {
.setCharacteristics(characteristics)
.setSubmitter(resolveSubmitter(dbSession, taskDto.getSubmitterUuid()));
-
String componentUuid = taskDto.getComponentUuid();
if (component != null) {
builder.setComponent(new CeTask.Component(component.uuid(), component.getKey(), component.name()));
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
-import org.sonar.api.Startable;
import org.sonar.api.config.Configuration;
import org.sonar.api.server.ServerSide;
import org.sonar.api.utils.System2;
import org.sonar.api.utils.log.Loggers;
import org.sonar.api.utils.text.JsonWriter;
import org.sonar.server.property.InternalProperties;
+import org.sonar.server.util.AbstractStoppableScheduledExecutorServiceImpl;
import org.sonar.server.util.GlobalLockManager;
import static org.sonar.process.ProcessProperties.Property.SONAR_TELEMETRY_ENABLE;
import static org.sonar.process.ProcessProperties.Property.SONAR_TELEMETRY_URL;
@ServerSide
-public class TelemetryDaemon implements Startable {
+public class TelemetryDaemon extends AbstractStoppableScheduledExecutorServiceImpl<ScheduledExecutorService> {
private static final String THREAD_NAME_PREFIX = "sq-telemetry-service-";
private static final int ONE_DAY = 24 * 60 * 60 * 1_000;
private static final String I_PROP_LAST_PING = "telemetry.lastPing";
private final InternalProperties internalProperties;
private final System2 system2;
- private ScheduledExecutorService executorService;
-
public TelemetryDaemon(TelemetryDataLoader dataLoader, TelemetryDataJsonWriter dataJsonWriter, TelemetryClient telemetryClient, Configuration config,
InternalProperties internalProperties, GlobalLockManager lockManager, System2 system2) {
+ super(Executors.newSingleThreadScheduledExecutor(newThreadFactory()));
this.dataLoader = dataLoader;
this.dataJsonWriter = dataJsonWriter;
this.telemetryClient = telemetryClient;
return;
}
LOG.info("Sharing of SonarQube statistics is enabled.");
- executorService = Executors.newSingleThreadScheduledExecutor(newThreadFactory());
int frequencyInSeconds = frequency();
- executorService.scheduleWithFixedDelay(telemetryCommand(), frequencyInSeconds, frequencyInSeconds, TimeUnit.SECONDS);
- }
-
- @Override
- public void stop() {
- try {
- if (executorService == null) {
- return;
- }
- executorService.shutdown();
- executorService.awaitTermination(5, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
+ scheduleWithFixedDelay(telemetryCommand(), frequencyInSeconds, frequencyInSeconds, TimeUnit.SECONDS);
}
private static ThreadFactory newThreadFactory() {