]> source.dussan.org Git - sonarqube.git/commitdiff
SONAR-19084 periodically schedule the GitHub sync task.
authorWojtek Wajerowicz <115081248+wojciech-wajerowicz-sonarsource@users.noreply.github.com>
Thu, 27 Apr 2023 13:28:22 +0000 (15:28 +0200)
committersonartech <sonartech@sonarsource.com>
Thu, 11 May 2023 20:03:13 +0000 (20:03 +0000)
server/sonar-ce-common/src/it/java/org/sonar/ce/queue/CeQueueImplIT.java
server/sonar-ce-common/src/main/java/org/sonar/ce/queue/CeQueue.java
server/sonar-ce-common/src/main/java/org/sonar/ce/queue/CeQueueImpl.java
server/sonar-db-dao/src/main/java/org/sonar/db/Pagination.java
server/sonar-server-common/src/main/java/org/sonar/server/util/AbstractStoppableScheduledExecutorServiceImpl.java
server/sonar-webserver-core/src/main/java/org/sonar/server/telemetry/TelemetryDaemon.java

index 0823582b04de0f9f581ef30c747ecbbd4414e8e1..d89b20a568bf9a463e3980f628e5ffabeb44ee2b 100644 (file)
@@ -56,6 +56,7 @@ import static org.assertj.core.api.Assertions.tuple;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.sonar.ce.queue.CeQueue.SubmitOption.UNIQUE_QUEUE_PER_MAIN_COMPONENT;
+import static org.sonar.ce.queue.CeQueue.SubmitOption.UNIQUE_QUEUE_PER_TASK_TYPE;
 
 public class CeQueueImplIT {
 
@@ -208,6 +209,22 @@ public class CeQueueImplIT {
       .contains(task.getUuid());
   }
 
+  @Test
+  public void submit_with_UNIQUE_QUEUE_PER_TASK_TYPE_does_not_create_task_when_there_is_a_task_with_the_same_type() {
+    String mainComponentUuid = randomAlphabetic(5);
+    CeTaskSubmit taskSubmit = createTaskSubmit("some type", newComponent(mainComponentUuid), null);
+    String[] uuids = IntStream.range(0, 2 + new Random().nextInt(5))
+      .mapToObj(i -> insertPendingInQueue(newComponent(mainComponentUuid)))
+      .map(CeQueueDto::getUuid)
+      .toArray(String[]::new);
+    Optional<CeTask> task = underTest.submit(taskSubmit, UNIQUE_QUEUE_PER_TASK_TYPE);
+
+    assertThat(task).isEmpty();
+    assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession()))
+      .extracting(CeQueueDto::getUuid)
+      .containsOnly(uuids);
+  }
+
   @Test
   public void massSubmit_returns_tasks_for_each_CeTaskSubmit_populated_from_CeTaskSubmit_and_creates_CeQueue_row_for_each() {
     String mainComponentUuid = randomAlphabetic(10);
index 10b649a12987cf8ad2fb3b8d7ed1694a4b9c7984..adee3c00e93789cdfb85cf7db826abbde74426a9 100644 (file)
@@ -124,7 +124,8 @@ public interface CeQueue {
   int clear();
 
   enum SubmitOption {
-    UNIQUE_QUEUE_PER_MAIN_COMPONENT
+    UNIQUE_QUEUE_PER_MAIN_COMPONENT,
+    UNIQUE_QUEUE_PER_TASK_TYPE
   }
 
   enum WorkersPauseStatus {
index b124b509c02154b4cffdd64eb487138722598b03..e0af41a2682c326fa28eb0a2952b1d8969eabb11 100644 (file)
@@ -46,6 +46,7 @@ import org.sonar.db.DbSession;
 import org.sonar.db.ce.CeActivityDto;
 import org.sonar.db.ce.CeQueueDto;
 import org.sonar.db.ce.CeTaskCharacteristicDto;
+import org.sonar.db.ce.CeTaskQuery;
 import org.sonar.db.ce.DeleteIf;
 import org.sonar.db.component.ComponentDto;
 import org.sonar.db.user.UserDto;
@@ -55,7 +56,6 @@ import org.sonar.server.property.InternalProperties;
 import static com.google.common.base.Preconditions.checkState;
 import static java.util.Collections.emptyMap;
 import static java.util.Collections.singleton;
-import static java.util.Optional.of;
 import static java.util.Optional.ofNullable;
 import static org.sonar.ce.queue.CeQueue.SubmitOption.UNIQUE_QUEUE_PER_MAIN_COMPONENT;
 import static org.sonar.core.util.stream.MoreCollectors.toEnumSet;
@@ -95,24 +95,39 @@ public class CeQueueImpl implements CeQueue {
 
   private Optional<CeTask> submit(CeTaskSubmit submission, EnumSet<SubmitOption> submitOptions) {
     try (DbSession dbSession = dbClient.openSession(false)) {
-      if (submitOptions.contains(UNIQUE_QUEUE_PER_MAIN_COMPONENT)
-        && submission.getComponent()
-          .map(component -> dbClient.ceQueueDao().countByStatusAndMainComponentUuid(dbSession, PENDING, component.getMainComponentUuid()) > 0)
-          .orElse(false)) {
-        return Optional.empty();
-      }
-      CeQueueDto taskDto = addToQueueInDb(dbSession, submission);
+      Optional<CeTask> ceTask = submit(dbSession, submission, submitOptions);
       dbSession.commit();
+      return ceTask;
+    }
+  }
 
-      Map<String, ComponentDto> componentsByUuid = loadComponentDtos(dbSession, taskDto);
-      if (componentsByUuid.isEmpty()) {
-        return of(convertToTask(dbSession, taskDto, submission.getCharacteristics(), null, null));
+  private Optional<CeTask> submit(DbSession dbSession, CeTaskSubmit submission, EnumSet<SubmitOption> submitOptions) {
+    CeTaskQuery query = new CeTaskQuery();
+    for (SubmitOption option : submitOptions) {
+      switch (option) {
+        case UNIQUE_QUEUE_PER_MAIN_COMPONENT -> submission.getComponent()
+          .flatMap(component -> Optional.ofNullable(component.getMainComponentUuid()))
+          .ifPresent(mainComponentUuid -> query.setMainComponentUuid(mainComponentUuid).setStatuses(List.of(PENDING.name())));
+        case UNIQUE_QUEUE_PER_TASK_TYPE -> query.setType(submission.getType());
       }
+    }
+
+    boolean queryNonEmpty = query.getMainComponentUuids() != null || query.getStatuses() != null || query.getType() != null;
+    if (queryNonEmpty && dbClient.ceQueueDao().countByQuery(dbSession, query) > 0) {
+      return Optional.empty();
+    }
+    CeQueueDto inserted = addToQueueInDb(dbSession, submission);
+    return Optional.of(convertQueueDtoToTask(dbSession, inserted, submission));
+  }
 
-      return of(convertToTask(dbSession, taskDto, submission.getCharacteristics(),
-        ofNullable(taskDto.getComponentUuid()).map(componentsByUuid::get).orElse(null),
-        ofNullable(taskDto.getMainComponentUuid()).map(componentsByUuid::get).orElse(null)));
+  private CeTask convertQueueDtoToTask(DbSession dbSession, CeQueueDto queueDto, CeTaskSubmit submission) {
+    Map<String, ComponentDto> componentsByUuid = loadComponentDtos(dbSession, queueDto);
+    if (componentsByUuid.isEmpty()) {
+      return convertToTask(dbSession, queueDto, submission.getCharacteristics(), null, null);
     }
+    return convertToTask(dbSession, queueDto, submission.getCharacteristics(),
+      ofNullable(queueDto.getComponentUuid()).map(componentsByUuid::get).orElse(null),
+      ofNullable(queueDto.getMainComponentUuid()).map(componentsByUuid::get).orElse(null));
   }
 
   Map<String, ComponentDto> loadComponentDtos(DbSession dbSession, CeQueueDto taskDto) {
@@ -183,7 +198,7 @@ public class CeQueueImpl implements CeQueue {
     return Arrays.stream(options).collect(toEnumSet(SubmitOption.class));
   }
 
-  private CeQueueDto addToQueueInDb(DbSession dbSession, CeTaskSubmit submission) {
+  private void insertCharacteristics(DbSession dbSession, CeTaskSubmit submission) {
     for (Map.Entry<String, String> characteristic : submission.getCharacteristics().entrySet()) {
       CeTaskCharacteristicDto characteristicDto = new CeTaskCharacteristicDto();
       characteristicDto.setUuid(uuidFactory.create());
@@ -192,6 +207,10 @@ public class CeQueueImpl implements CeQueue {
       characteristicDto.setValue(characteristic.getValue());
       dbClient.ceTaskCharacteristicsDao().insert(dbSession, characteristicDto);
     }
+  }
+
+  private CeQueueDto addToQueueInDb(DbSession dbSession, CeTaskSubmit submission) {
+    insertCharacteristics(dbSession, submission);
 
     CeQueueDto dto = new CeQueueDto();
     dto.setUuid(submission.getUuid());
@@ -262,7 +281,7 @@ public class CeQueueImpl implements CeQueue {
     updateExecutionFields(activityDto);
     remove(dbSession, task, activityDto);
   }
-  
+
   protected long updateExecutionFields(CeActivityDto activityDto) {
     Long startedAt = activityDto.getStartedAt();
     if (startedAt == null) {
@@ -354,7 +373,6 @@ public class CeQueueImpl implements CeQueue {
       .setCharacteristics(characteristics)
       .setSubmitter(resolveSubmitter(dbSession, taskDto.getSubmitterUuid()));
 
-
     String componentUuid = taskDto.getComponentUuid();
     if (component != null) {
       builder.setComponent(new CeTask.Component(component.uuid(), component.getKey(), component.name()));
index d4a68eaddaff0158460bcb8da9302dab4f86281e..bf1f260add7265f0935cd7f38cea5ee5acabad59 100644 (file)
@@ -27,6 +27,8 @@ import static com.google.common.base.Preconditions.checkArgument;
 public final class Pagination {
   private static final Pagination ALL = new Builder(1).andSize(Integer.MAX_VALUE);
 
+  private static final Pagination FIRST = new Builder(1).andSize(1);
+
   private final int page;
   private final int pageSize;
 
@@ -39,6 +41,10 @@ public final class Pagination {
     return ALL;
   }
 
+  public static Pagination first() {
+    return FIRST;
+  }
+
   public static Builder forPage(int page) {
     return new Builder(page);
   }
index 7699f9842d613b03533acdbe20d82752dc722346..f4ab38e716a1bf3547d285d1c726edd1331f6cb0 100644 (file)
@@ -24,9 +24,9 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
-public class AbstractStoppableScheduledExecutorServiceImpl<T extends ScheduledExecutorService> extends AbstractStoppableExecutorService<T>
+public abstract class AbstractStoppableScheduledExecutorServiceImpl<T extends ScheduledExecutorService> extends AbstractStoppableExecutorService<T>
   implements StoppableScheduledExecutorService {
-  public AbstractStoppableScheduledExecutorServiceImpl(T delegate) {
+  protected AbstractStoppableScheduledExecutorServiceImpl(T delegate) {
     super(delegate);
   }
 
index 2c423c1d14a04af93a1c1360f9194330462d72c2..974219952bac0950e190f13be6969210dd8b62e5 100644 (file)
@@ -27,7 +27,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
-import org.sonar.api.Startable;
 import org.sonar.api.config.Configuration;
 import org.sonar.api.server.ServerSide;
 import org.sonar.api.utils.System2;
@@ -35,6 +34,7 @@ import org.sonar.api.utils.log.Logger;
 import org.sonar.api.utils.log.Loggers;
 import org.sonar.api.utils.text.JsonWriter;
 import org.sonar.server.property.InternalProperties;
+import org.sonar.server.util.AbstractStoppableScheduledExecutorServiceImpl;
 import org.sonar.server.util.GlobalLockManager;
 
 import static org.sonar.process.ProcessProperties.Property.SONAR_TELEMETRY_ENABLE;
@@ -42,7 +42,7 @@ import static org.sonar.process.ProcessProperties.Property.SONAR_TELEMETRY_FREQU
 import static org.sonar.process.ProcessProperties.Property.SONAR_TELEMETRY_URL;
 
 @ServerSide
-public class TelemetryDaemon implements Startable {
+public class TelemetryDaemon extends AbstractStoppableScheduledExecutorServiceImpl<ScheduledExecutorService> {
   private static final String THREAD_NAME_PREFIX = "sq-telemetry-service-";
   private static final int ONE_DAY = 24 * 60 * 60 * 1_000;
   private static final String I_PROP_LAST_PING = "telemetry.lastPing";
@@ -60,10 +60,9 @@ public class TelemetryDaemon implements Startable {
   private final InternalProperties internalProperties;
   private final System2 system2;
 
-  private ScheduledExecutorService executorService;
-
   public TelemetryDaemon(TelemetryDataLoader dataLoader, TelemetryDataJsonWriter dataJsonWriter, TelemetryClient telemetryClient, Configuration config,
     InternalProperties internalProperties, GlobalLockManager lockManager, System2 system2) {
+    super(Executors.newSingleThreadScheduledExecutor(newThreadFactory()));
     this.dataLoader = dataLoader;
     this.dataJsonWriter = dataJsonWriter;
     this.telemetryClient = telemetryClient;
@@ -90,22 +89,8 @@ public class TelemetryDaemon implements Startable {
       return;
     }
     LOG.info("Sharing of SonarQube statistics is enabled.");
-    executorService = Executors.newSingleThreadScheduledExecutor(newThreadFactory());
     int frequencyInSeconds = frequency();
-    executorService.scheduleWithFixedDelay(telemetryCommand(), frequencyInSeconds, frequencyInSeconds, TimeUnit.SECONDS);
-  }
-
-  @Override
-  public void stop() {
-    try {
-      if (executorService == null) {
-        return;
-      }
-      executorService.shutdown();
-      executorService.awaitTermination(5, TimeUnit.SECONDS);
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-    }
+    scheduleWithFixedDelay(telemetryCommand(), frequencyInSeconds, frequencyInSeconds, TimeUnit.SECONDS);
   }
 
   private static ThreadFactory newThreadFactory() {