]> source.dussan.org Git - sonarqube.git/commitdiff
SONAR-6834 complete filtering paramters of api/ce/activity
authorSimon Brandhof <simon.brandhof@sonarsource.com>
Fri, 18 Sep 2015 09:33:18 +0000 (11:33 +0200)
committerSimon Brandhof <simon.brandhof@sonarsource.com>
Sat, 19 Sep 2015 06:56:33 +0000 (08:56 +0200)
37 files changed:
server/sonar-server/src/main/java/org/sonar/server/computation/CeQueue.java
server/sonar-server/src/main/java/org/sonar/server/computation/CeQueueCleaner.java [new file with mode: 0644]
server/sonar-server/src/main/java/org/sonar/server/computation/CeQueueImpl.java [new file with mode: 0644]
server/sonar-server/src/main/java/org/sonar/server/computation/CeQueueInitializer.java
server/sonar-server/src/main/java/org/sonar/server/computation/CeTask.java
server/sonar-server/src/main/java/org/sonar/server/computation/CeTaskSubmit.java [deleted file]
server/sonar-server/src/main/java/org/sonar/server/computation/ReportFiles.java
server/sonar-server/src/main/java/org/sonar/server/computation/ReportProcessingScheduler.java
server/sonar-server/src/main/java/org/sonar/server/computation/ReportProcessingSchedulerExecutorService.java
server/sonar-server/src/main/java/org/sonar/server/computation/ReportSubmitter.java
server/sonar-server/src/main/java/org/sonar/server/computation/TaskSubmission.java [new file with mode: 0644]
server/sonar-server/src/main/java/org/sonar/server/computation/ws/CeActivityWsAction.java
server/sonar-server/src/main/java/org/sonar/server/computation/ws/CeSubmitWsAction.java
server/sonar-server/src/main/java/org/sonar/server/platform/platformlevel/PlatformLevel4.java
server/sonar-server/src/main/java/org/sonar/server/util/AbstractStoppableExecutorService.java
server/sonar-server/src/main/resources/org/sonar/server/computation/ws/CeActivityWsAction/example.json
server/sonar-server/src/test/java/org/sonar/server/computation/CeQueueCleanerTest.java [new file with mode: 0644]
server/sonar-server/src/test/java/org/sonar/server/computation/CeQueueImplTest.java [new file with mode: 0644]
server/sonar-server/src/test/java/org/sonar/server/computation/CeQueueInitializerTest.java
server/sonar-server/src/test/java/org/sonar/server/computation/CeQueueTest.java [deleted file]
server/sonar-server/src/test/java/org/sonar/server/computation/CeWorkerImplTest.java
server/sonar-server/src/test/java/org/sonar/server/computation/ReportProcessingSchedulerTest.java
server/sonar-server/src/test/java/org/sonar/server/computation/ReportSubmitterTest.java
server/sonar-server/src/test/java/org/sonar/server/computation/TestTaskSubmission.java [new file with mode: 0644]
server/sonar-server/src/test/java/org/sonar/server/computation/monitoring/ComputeEngineQueueMonitorTest.java
server/sonar-server/src/test/java/org/sonar/server/computation/ws/CeActivityWsActionTest.java [new file with mode: 0644]
server/sonar-server/src/test/java/org/sonar/server/computation/ws/CeSubmitWsActionTest.java
server/sonar-server/src/test/java/org/sonar/server/computation/ws/CeWsTest.java
server/sonar-web/src/main/webapp/WEB-INF/db/migrate/931_create_ce_activity.rb
sonar-batch/src/main/java/org/sonar/batch/util/BatchUtils.java
sonar-batch/src/test/java/org/sonar/batch/util/BatchUtilsTest.java [new file with mode: 0644]
sonar-db/src/main/java/org/sonar/db/ce/CeActivityDao.java
sonar-db/src/main/java/org/sonar/db/ce/CeActivityMapper.java
sonar-db/src/main/java/org/sonar/db/ce/CeActivityQuery.java
sonar-db/src/main/resources/org/sonar/db/ce/CeActivityMapper.xml
sonar-db/src/main/resources/org/sonar/db/version/schema-h2.ddl
sonar-db/src/test/java/org/sonar/db/ce/CeActivityDaoTest.java

index 4dfab3e76f5276529889b029ce50e0818095e7d2..fd8864b0fe38217c338cc0b334fbd110283e7b85 100644 (file)
 package org.sonar.server.computation;
 
 import com.google.common.base.Optional;
-import org.sonar.api.server.ServerSide;
-import org.sonar.api.utils.System2;
-import org.sonar.core.util.UuidFactory;
-import org.sonar.db.DbClient;
-import org.sonar.db.DbSession;
 import org.sonar.db.ce.CeActivityDto;
-import org.sonar.db.ce.CeQueueDto;
-import org.sonar.server.computation.monitoring.CEQueueStatus;
-
-import static java.lang.String.format;
 
 /**
  * Queue of pending Compute Engine tasks. Both producer and consumer actions
@@ -38,96 +29,40 @@ import static java.lang.String.format;
  *   This class is decoupled from the regular task type {@link org.sonar.db.ce.CeTaskTypes#REPORT}.
  * </p>
  */
-@ServerSide
-public class CeQueue {
-
-  private final System2 system2;
-  private final DbClient dbClient;
-  private final UuidFactory uuidFactory;
-  private final CEQueueStatus queueStatus;
-  private final CeQueueListener[] listeners;
-
-  // state
-  private boolean submitPaused = false;
-  private boolean peekPaused = false;
-
-  public CeQueue(System2 system2, DbClient dbClient, UuidFactory uuidFactory,
-    CEQueueStatus queueStatus, CeQueueListener[] listeners) {
-    this.system2 = system2;
-    this.dbClient = dbClient;
-    this.uuidFactory = uuidFactory;
-    this.queueStatus = queueStatus;
-    this.listeners = listeners;
-  }
-
-  public CeTaskSubmit prepareSubmit() {
-    return new CeTaskSubmit(uuidFactory.create());
-  }
-
-  public CeTask submit(CeTaskSubmit submit) {
-    if (submitPaused) {
-      throw new IllegalStateException("Compute Engine does not currently accept new tasks");
-    }
-    CeTask task = new CeTask(submit);
-    DbSession dbSession = dbClient.openSession(false);
-    try {
-      CeQueueDto dto = new CeQueueDto();
-      dto.setUuid(task.getUuid());
-      dto.setTaskType(task.getType());
-      dto.setComponentUuid(task.getComponentUuid());
-      dto.setStatus(CeQueueDto.Status.PENDING);
-      dto.setSubmitterLogin(task.getSubmitterLogin());
-      dto.setStartedAt(null);
-      dbClient.ceQueueDao().insert(dbSession, dto);
-      dbSession.commit();
-      queueStatus.addReceived();
-      return task;
-    } finally {
-      dbClient.closeSession(dbSession);
-    }
-  }
-
-  public Optional<CeTask> peek() {
-    if (peekPaused) {
-      return Optional.absent();
-    }
-    DbSession dbSession = dbClient.openSession(false);
-    try {
-      Optional<CeQueueDto> dto = dbClient.ceQueueDao().peek(dbSession);
-      if (!dto.isPresent()) {
-        return Optional.absent();
-      }
-      queueStatus.addInProgress();
-      return Optional.of(new CeTask(dto.get()));
-
-    } finally {
-      dbClient.closeSession(dbSession);
-    }
-  }
+public interface CeQueue {
+  /**
+   * Build an instance of {@link TaskSubmission} required for {@link #submit(TaskSubmission)}. It allows
+   * to enforce that task ids are generated by the queue. It's used also for having access
+   * to the id before submitting the task to the queue.
+   */
+  TaskSubmission prepareSubmit();
 
-  public boolean cancel(String taskUuid) {
-    DbSession dbSession = dbClient.openSession(false);
-    try {
-      Optional<CeQueueDto> queueDto = dbClient.ceQueueDao().selectByUuid(dbSession, taskUuid);
-      if (queueDto.isPresent()) {
-        if (!queueDto.get().getStatus().equals(CeQueueDto.Status.PENDING)) {
-          throw new IllegalStateException(String.format("Task is in progress and can't be cancelled [uuid=%s]", taskUuid));
-        }
-        cancel(dbSession, queueDto.get());
-        return true;
-      }
-      return false;
-    } finally {
-      dbClient.closeSession(dbSession);
-    }
-  }
+  /**
+   * Submits a task to the queue. The task is processed asynchronously.
+   * If submits are paused (see {@link #isSubmitPaused()}, then an
+   * unchecked exception is thrown.
+   */
+  CeTask submit(TaskSubmission submission);
 
-  void cancel(DbSession dbSession, CeQueueDto q) {
-    CeActivityDto activityDto = new CeActivityDto(q);
-    activityDto.setStatus(CeActivityDto.Status.CANCELED);
-    remove(dbSession, new CeTask(q), q, activityDto);
-  }
+  /**
+   * Peek the oldest task in status {@link org.sonar.db.ce.CeQueueDto.Status#PENDING}.
+   * The task status is changed to {@link org.sonar.db.ce.CeQueueDto.Status#IN_PROGRESS}.
+   * Does not return anything if the queue is paused (see {@link #isPeekPaused()}.
+   *
+   * <p>Only a single task can be peeked by project.</p>
+   *
+   * <p>An unchecked exception may be thrown on technical errors (db connection, ...).</p>
+   */
+  Optional<CeTask> peek();
 
+  /**
+   * Cancels a task in status {@link org.sonar.db.ce.CeQueueDto.Status#PENDING}. An unchecked
+   * exception is thrown if the status is not {@link org.sonar.db.ce.CeQueueDto.Status#PENDING}.
+   * The method does nothing and returns {@code false} if the task does not exist.
+   *
+   * @return true if the task exists and is successfully canceled.
+   */
+  boolean cancel(String taskUuid);
 
   /**
    * Removes all the tasks from the queue, whatever their status. They are marked
@@ -137,9 +72,7 @@ public class CeQueue {
    *
    * @return the number of canceled tasks
    */
-  public int clear() {
-    return cancelAll(true);
-  }
+  int clear();
 
   /**
    * Similar as {@link #clear()}, except that the tasks with status
@@ -148,83 +81,25 @@ public class CeQueue {
    *
    * @return the number of canceled tasks
    */
-  public int cancelAll() {
-    return cancelAll(false);
-  }
+  int cancelAll();
 
-  private int cancelAll(boolean includeInProgress) {
-    int count = 0;
-    DbSession dbSession = dbClient.openSession(false);
-    try {
-      for (CeQueueDto queueDto : dbClient.ceQueueDao().selectAllInAscOrder(dbSession)) {
-        if (includeInProgress || !queueDto.getStatus().equals(CeQueueDto.Status.IN_PROGRESS)) {
-          cancel(dbSession, queueDto);
-          count++;
-        }
-      }
-      return count;
-    } finally {
-      dbClient.closeSession(dbSession);
-    }
-  }
-
-  public void remove(CeTask task, CeActivityDto.Status status) {
-    DbSession dbSession = dbClient.openSession(false);
-    try {
-      Optional<CeQueueDto> queueDto = dbClient.ceQueueDao().selectByUuid(dbSession, task.getUuid());
-      if (!queueDto.isPresent()) {
-        throw new IllegalStateException(format("Task does not exist anymore: %s", task));
-      }
-      CeActivityDto activityDto = new CeActivityDto(queueDto.get());
-      activityDto.setStatus(status);
-      Long startedAt = activityDto.getStartedAt();
-      if (startedAt != null) {
-        activityDto.setFinishedAt(system2.now());
-        long executionTime = activityDto.getFinishedAt() - startedAt;
-        activityDto.setExecutionTimeMs(executionTime);
-        if (status == CeActivityDto.Status.SUCCESS) {
-          queueStatus.addSuccess(executionTime);
-        } else {
-          queueStatus.addError(executionTime);
-        }
-      }
-      remove(dbSession, task, queueDto.get(), activityDto);
-
-    } finally {
-      dbClient.closeSession(dbSession);
-    }
-  }
-
-  private void remove(DbSession dbSession, CeTask task, CeQueueDto queueDto, CeActivityDto activityDto) {
-    dbClient.ceActivityDao().insert(dbSession, activityDto);
-    dbClient.ceQueueDao().deleteByUuid(dbSession, queueDto.getUuid());
-    dbSession.commit();
-    for (CeQueueListener listener : listeners) {
-      listener.onRemoved(task, activityDto.getStatus());
-    }
-  }
+  /**
+   * Removes a task from the queue and registers it to past activities. This method
+   * is called by Compute Engine workers when task is processed.
+   *
+   * <p>An unchecked exception is thrown if the task does not exist in the queue.</p>
+   */
+  void remove(CeTask task, CeActivityDto.Status status);
 
-  public void pauseSubmit() {
-    this.submitPaused = true;
-  }
+  void pauseSubmit();
 
-  public void resumeSubmit() {
-    this.submitPaused = false;
-  }
+  void resumeSubmit();
 
-  public boolean isSubmitPaused() {
-    return submitPaused;
-  }
+  boolean isSubmitPaused();
 
-  public void pausePeek() {
-    this.peekPaused = true;
-  }
+  void pausePeek();
 
-  public void resumePeek() {
-    this.peekPaused = false;
-  }
+  void resumePeek();
 
-  public boolean isPeekPaused() {
-    return peekPaused;
-  }
+  boolean isPeekPaused();
 }
diff --git a/server/sonar-server/src/main/java/org/sonar/server/computation/CeQueueCleaner.java b/server/sonar-server/src/main/java/org/sonar/server/computation/CeQueueCleaner.java
new file mode 100644 (file)
index 0000000..7b67aa3
--- /dev/null
@@ -0,0 +1,91 @@
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+package org.sonar.server.computation;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.sonar.api.platform.ServerUpgradeStatus;
+import org.sonar.api.server.ServerSide;
+import org.sonar.api.utils.log.Logger;
+import org.sonar.api.utils.log.Loggers;
+import org.sonar.db.DbClient;
+import org.sonar.db.DbSession;
+import org.sonar.db.ce.CeQueueDto;
+import org.sonar.db.ce.CeTaskTypes;
+
+/**
+ * Cleans-up the Compute Engine queue and resets the JMX counters.
+ * CE workers must not be started before execution of this class.
+ */
+@ServerSide
+public class CeQueueCleaner {
+
+  private static final Logger LOGGER = Loggers.get(CeQueueCleaner.class);
+
+  private final DbClient dbClient;
+  private final ServerUpgradeStatus serverUpgradeStatus;
+  private final ReportFiles reportFiles;
+  private final CeQueueImpl queue;
+
+  public CeQueueCleaner(DbClient dbClient, ServerUpgradeStatus serverUpgradeStatus, ReportFiles reportFiles, CeQueueImpl queue) {
+    this.dbClient = dbClient;
+    this.serverUpgradeStatus = serverUpgradeStatus;
+    this.reportFiles = reportFiles;
+    this.queue = queue;
+  }
+
+  public void clean(DbSession dbSession) {
+    if (serverUpgradeStatus.isUpgraded()) {
+      cleanOnUpgrade();
+    } else {
+      verifyConsistency(dbSession);
+    }
+  }
+
+  private void cleanOnUpgrade() {
+    // we assume that pending tasks are not compatible with the new version
+    // and can't be processed
+    LOGGER.info("Cancel all pending tasks (due to upgrade)");
+    queue.clear();
+  }
+
+  private void verifyConsistency(DbSession dbSession) {
+    // server is not being upgraded
+    dbClient.ceQueueDao().resetAllToPendingStatus(dbSession);
+    dbSession.commit();
+
+    // verify that the report files are available for the tasks in queue
+    Set<String> uuidsInQueue = new HashSet<>();
+    for (CeQueueDto queueDto : dbClient.ceQueueDao().selectAllInAscOrder(dbSession)) {
+      uuidsInQueue.add(queueDto.getUuid());
+      if (CeTaskTypes.REPORT.equals(queueDto.getTaskType()) && !reportFiles.fileForUuid(queueDto.getUuid()).exists()) {
+        // the report is not available on file system
+        queue.cancel(dbSession, queueDto);
+      }
+    }
+
+    // clean-up filesystem
+    for (String uuid : reportFiles.listUuids()) {
+      if (!uuidsInQueue.contains(uuid)) {
+        reportFiles.deleteIfExists(uuid);
+      }
+    }
+  }
+}
diff --git a/server/sonar-server/src/main/java/org/sonar/server/computation/CeQueueImpl.java b/server/sonar-server/src/main/java/org/sonar/server/computation/CeQueueImpl.java
new file mode 100644 (file)
index 0000000..dd4c849
--- /dev/null
@@ -0,0 +1,285 @@
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+package org.sonar.server.computation;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Strings;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.annotation.Nullable;
+import org.sonar.api.server.ServerSide;
+import org.sonar.api.utils.System2;
+import org.sonar.core.util.UuidFactory;
+import org.sonar.db.DbClient;
+import org.sonar.db.DbSession;
+import org.sonar.db.ce.CeActivityDto;
+import org.sonar.db.ce.CeQueueDto;
+import org.sonar.server.computation.monitoring.CEQueueStatus;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static java.lang.String.format;
+
+@ServerSide
+public class CeQueueImpl implements CeQueue {
+
+  private final System2 system2;
+  private final DbClient dbClient;
+  private final UuidFactory uuidFactory;
+  private final CEQueueStatus queueStatus;
+  private final CeQueueListener[] listeners;
+
+  // state
+  private AtomicBoolean submitPaused = new AtomicBoolean(false);
+  private AtomicBoolean peekPaused = new AtomicBoolean(false);
+
+  public CeQueueImpl(System2 system2, DbClient dbClient, UuidFactory uuidFactory,
+    CEQueueStatus queueStatus, CeQueueListener[] listeners) {
+    this.system2 = system2;
+    this.dbClient = dbClient;
+    this.uuidFactory = uuidFactory;
+    this.queueStatus = queueStatus;
+    this.listeners = listeners;
+  }
+
+  @Override
+  public TaskSubmission prepareSubmit() {
+    return new TaskSubmissionImpl(uuidFactory.create());
+  }
+
+  @Override
+  public CeTask submit(TaskSubmission submission) {
+    checkArgument(!Strings.isNullOrEmpty(submission.getUuid()));
+    checkArgument(!Strings.isNullOrEmpty(submission.getType()));
+    checkArgument(submission instanceof TaskSubmissionImpl);
+    checkState(!submitPaused.get(), "Compute Engine does not currently accept new tasks");
+
+    CeTask task = new CeTask(submission);
+    DbSession dbSession = dbClient.openSession(false);
+    try {
+      CeQueueDto dto = createQueueDtoForSubmit(task);
+      dbClient.ceQueueDao().insert(dbSession, dto);
+      dbSession.commit();
+      queueStatus.addReceived();
+      return task;
+    } finally {
+      dbClient.closeSession(dbSession);
+    }
+  }
+
+  private CeQueueDto createQueueDtoForSubmit(CeTask task) {
+    CeQueueDto dto = new CeQueueDto();
+    dto.setUuid(task.getUuid());
+    dto.setTaskType(task.getType());
+    dto.setComponentUuid(task.getComponentUuid());
+    dto.setStatus(CeQueueDto.Status.PENDING);
+    dto.setSubmitterLogin(task.getSubmitterLogin());
+    dto.setStartedAt(null);
+    return dto;
+  }
+
+  @Override
+  public Optional<CeTask> peek() {
+    if (peekPaused.get()) {
+      return Optional.absent();
+    }
+    DbSession dbSession = dbClient.openSession(false);
+    try {
+      Optional<CeQueueDto> dto = dbClient.ceQueueDao().peek(dbSession);
+      CeTask task = null;
+      if (dto.isPresent()) {
+        queueStatus.addInProgress();
+        task = new CeTask(dto.get());
+      }
+      return Optional.fromNullable(task);
+
+    } finally {
+      dbClient.closeSession(dbSession);
+    }
+  }
+
+  @Override
+  public boolean cancel(String taskUuid) {
+    DbSession dbSession = dbClient.openSession(false);
+    try {
+      Optional<CeQueueDto> queueDto = dbClient.ceQueueDao().selectByUuid(dbSession, taskUuid);
+      if (queueDto.isPresent()) {
+        checkState(CeQueueDto.Status.PENDING.equals(queueDto.get().getStatus()), "Task is in progress and can't be canceled [uuid=%s]", taskUuid);
+        cancel(dbSession, queueDto.get());
+        return true;
+      }
+      return false;
+    } finally {
+      dbClient.closeSession(dbSession);
+    }
+  }
+
+  void cancel(DbSession dbSession, CeQueueDto q) {
+    CeActivityDto activityDto = new CeActivityDto(q);
+    activityDto.setStatus(CeActivityDto.Status.CANCELED);
+    remove(dbSession, new CeTask(q), q, activityDto);
+  }
+
+  @Override
+  public int clear() {
+    return cancelAll(true);
+  }
+
+  @Override
+  public int cancelAll() {
+    return cancelAll(false);
+  }
+
+  private int cancelAll(boolean includeInProgress) {
+    int count = 0;
+    DbSession dbSession = dbClient.openSession(false);
+    try {
+      for (CeQueueDto queueDto : dbClient.ceQueueDao().selectAllInAscOrder(dbSession)) {
+        if (includeInProgress || !queueDto.getStatus().equals(CeQueueDto.Status.IN_PROGRESS)) {
+          cancel(dbSession, queueDto);
+          count++;
+        }
+      }
+      return count;
+    } finally {
+      dbClient.closeSession(dbSession);
+    }
+  }
+
+  @Override
+  public void remove(CeTask task, CeActivityDto.Status status) {
+    DbSession dbSession = dbClient.openSession(false);
+    try {
+      Optional<CeQueueDto> queueDto = dbClient.ceQueueDao().selectByUuid(dbSession, task.getUuid());
+      if (!queueDto.isPresent()) {
+        throw new IllegalStateException(format("Task does not exist anymore: %s", task));
+      }
+      CeActivityDto activityDto = new CeActivityDto(queueDto.get());
+      activityDto.setStatus(status);
+      updateQueueStatus(status, activityDto);
+      remove(dbSession, task, queueDto.get(), activityDto);
+
+    } finally {
+      dbClient.closeSession(dbSession);
+    }
+  }
+
+  private void updateQueueStatus(CeActivityDto.Status status, CeActivityDto activityDto) {
+    Long startedAt = activityDto.getStartedAt();
+    if (startedAt == null) {
+      return;
+    }
+    activityDto.setFinishedAt(system2.now());
+    long executionTime = activityDto.getFinishedAt() - startedAt;
+    activityDto.setExecutionTimeMs(executionTime);
+    if (status == CeActivityDto.Status.SUCCESS) {
+      queueStatus.addSuccess(executionTime);
+    } else {
+      queueStatus.addError(executionTime);
+    }
+  }
+
+  private void remove(DbSession dbSession, CeTask task, CeQueueDto queueDto, CeActivityDto activityDto) {
+    dbClient.ceActivityDao().insert(dbSession, activityDto);
+    dbClient.ceQueueDao().deleteByUuid(dbSession, queueDto.getUuid());
+    dbSession.commit();
+    for (CeQueueListener listener : listeners) {
+      listener.onRemoved(task, activityDto.getStatus());
+    }
+  }
+
+  @Override
+  public void pauseSubmit() {
+    this.submitPaused.set(true);
+  }
+
+  @Override
+  public void resumeSubmit() {
+    this.submitPaused.set(false);
+  }
+
+  @Override
+  public boolean isSubmitPaused() {
+    return submitPaused.get();
+  }
+
+  @Override
+  public void pausePeek() {
+    this.peekPaused.set(true);
+  }
+
+  @Override
+  public void resumePeek() {
+    this.peekPaused.set(false);
+  }
+
+  @Override
+  public boolean isPeekPaused() {
+    return peekPaused.get();
+  }
+
+  private static class TaskSubmissionImpl implements TaskSubmission {
+    private final String uuid;
+    private String type;
+    private String componentUuid;
+    private String submitterLogin;
+
+    private TaskSubmissionImpl(String uuid) {
+      this.uuid = uuid;
+    }
+
+    @Override
+    public String getUuid() {
+      return uuid;
+    }
+
+    @Override
+    public String getType() {
+      return type;
+    }
+
+    @Override
+    public TaskSubmission setType(@Nullable String s) {
+      this.type = s;
+      return this;
+    }
+
+    @Override
+    public String getComponentUuid() {
+      return componentUuid;
+    }
+
+    @Override
+    public TaskSubmission setComponentUuid(@Nullable String s) {
+      this.componentUuid = s;
+      return this;
+    }
+
+    @Override
+    public String getSubmitterLogin() {
+      return submitterLogin;
+    }
+
+    @Override
+    public TaskSubmission setSubmitterLogin(@Nullable String s) {
+      this.submitterLogin = s;
+      return this;
+    }
+  }
+}
index 661e9ba680d65f48a5c53b76c16c624c568265ee..f05fa05bfcb21e12e108550dac163bb22842e880 100644 (file)
  */
 package org.sonar.server.computation;
 
-import java.util.HashSet;
-import java.util.Set;
-import org.sonar.api.platform.ServerUpgradeStatus;
+import org.picocontainer.Startable;
 import org.sonar.api.server.ServerSide;
-import org.sonar.api.utils.log.Logger;
-import org.sonar.api.utils.log.Loggers;
 import org.sonar.db.DbClient;
 import org.sonar.db.DbSession;
-import org.sonar.db.ce.CeQueueDto;
-import org.sonar.db.ce.CeTaskTypes;
 import org.sonar.server.computation.monitoring.CEQueueStatus;
 
 /**
- * Cleans-up the Compute Engine queue and resets the JMX counters.
- * CE workers must not be started before execution of this class.
+ * Cleans-up the queue, initializes JMX counters then schedule
+ * the execution of workers. That allows to not prevent workers
+ * from peeking the queue before it's ready.
  */
 @ServerSide
-public class CeQueueInitializer {
-
-  private static final Logger LOGGER = Loggers.get(CeQueueInitializer.class);
+public class CeQueueInitializer implements Startable {
 
   private final DbClient dbClient;
-  private final ServerUpgradeStatus serverUpgradeStatus;
-  private final ReportFiles reportFiles;
-  private final CeQueue queue;
   private final CEQueueStatus queueStatus;
+  private final CeQueueCleaner cleaner;
+  private final ReportProcessingScheduler scheduler;
 
-  public CeQueueInitializer(DbClient dbClient, ServerUpgradeStatus serverUpgradeStatus, ReportFiles reportFiles,
-    CeQueue queue, CEQueueStatus queueStatus) {
+  public CeQueueInitializer(DbClient dbClient, CEQueueStatus queueStatus, CeQueueCleaner cleaner, ReportProcessingScheduler scheduler) {
     this.dbClient = dbClient;
-    this.serverUpgradeStatus = serverUpgradeStatus;
-    this.reportFiles = reportFiles;
-    this.queue = queue;
     this.queueStatus = queueStatus;
+    this.cleaner = cleaner;
+    this.scheduler = scheduler;
   }
 
-  /**
-   * Do not rename. Used at server startup.
-   */
+  @Override
   public void start() {
     DbSession dbSession = dbClient.openSession(false);
     try {
       initJmxCounters(dbSession);
-
-      if (serverUpgradeStatus.isUpgraded()) {
-        cleanOnUpgrade();
-      } else {
-        verifyConsistency(dbSession);
-      }
+      cleaner.clean(dbSession);
+      scheduler.schedule();
 
     } finally {
       dbClient.closeSession(dbSession);
     }
   }
 
-  private void cleanOnUpgrade() {
-    // we assume that pending tasks are not compatible with the new version
-    // and can't be processed
-    LOGGER.info("Cancel all pending tasks (due to upgrade)");
-    queue.clear();
-  }
-
-  private void verifyConsistency(DbSession dbSession) {
-    // server is not being upgraded
-    dbClient.ceQueueDao().resetAllToPendingStatus(dbSession);
-    dbSession.commit();
-
-    // verify that the report files are available for the tasks in queue
-    Set<String> uuidsInQueue = new HashSet<>();
-    for (CeQueueDto queueDto : dbClient.ceQueueDao().selectAllInAscOrder(dbSession)) {
-      uuidsInQueue.add(queueDto.getUuid());
-      if (CeTaskTypes.REPORT.equals(queueDto.getTaskType()) && !reportFiles.fileForUuid(queueDto.getUuid()).exists()) {
-        // the report is not available on file system
-        queue.cancel(dbSession, queueDto);
-      }
-    }
-
-    // clean-up filesystem
-    for (String uuid : reportFiles.listUuids()) {
-      if (!uuidsInQueue.contains(uuid)) {
-        reportFiles.deleteIfExists(uuid);
-      }
-    }
+  @Override
+  public void stop() {
+    // nothing to do
   }
 
   private void initJmxCounters(DbSession dbSession) {
index 38284022f10ade66f01700ebb8f0a69887accf86..172f5ba6c5a1610bc00b82e723d4ddda4b14b6b0 100644 (file)
@@ -25,6 +25,8 @@ import javax.annotation.Nullable;
 import javax.annotation.concurrent.Immutable;
 import org.sonar.db.ce.CeQueueDto;
 
+import static java.util.Objects.requireNonNull;
+
 @Immutable
 public class CeTask {
 
@@ -34,24 +36,18 @@ public class CeTask {
   private final String submitterLogin;
 
   public CeTask(String uuid, String type, @Nullable String componentUuid, @Nullable String submitterLogin) {
-    this.uuid = uuid;
-    this.type = type;
+    this.uuid = requireNonNull(uuid);
+    this.type = requireNonNull(type);
     this.componentUuid = componentUuid;
     this.submitterLogin = submitterLogin;
   }
 
-  CeTask(CeTaskSubmit submit) {
-    this.uuid = submit.getUuid();
-    this.type = submit.getType();
-    this.componentUuid = submit.getComponentUuid();
-    this.submitterLogin = submit.getSubmitterLogin();
+  CeTask(TaskSubmission submit) {
+    this(submit.getUuid(), submit.getType(), submit.getComponentUuid(), submit.getSubmitterLogin());
   }
 
   CeTask(CeQueueDto dto) {
-    this.uuid = dto.getUuid();
-    this.type = dto.getTaskType();
-    this.componentUuid = dto.getComponentUuid();
-    this.submitterLogin = dto.getSubmitterLogin();
+    this(dto.getUuid(), dto.getTaskType(), dto.getComponentUuid(), dto.getSubmitterLogin());
   }
 
   public String getUuid() {
@@ -83,7 +79,7 @@ public class CeTask {
   }
 
   @Override
-  public boolean equals(Object o) {
+  public boolean equals(@Nullable Object o) {
     if (this == o) {
       return true;
     }
diff --git a/server/sonar-server/src/main/java/org/sonar/server/computation/CeTaskSubmit.java b/server/sonar-server/src/main/java/org/sonar/server/computation/CeTaskSubmit.java
deleted file mode 100644 (file)
index ab2c251..0000000
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * SonarQube, open source software quality management tool.
- * Copyright (C) 2008-2014 SonarSource
- * mailto:contact AT sonarsource DOT com
- *
- * SonarQube is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * SonarQube is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
- */
-package org.sonar.server.computation;
-
-public class CeTaskSubmit {
-
-  private final String uuid;
-  private String type;
-  private String componentUuid;
-  private String submitterLogin;
-
-  CeTaskSubmit(String uuid) {
-    this.uuid = uuid;
-  }
-
-  public String getUuid() {
-    return uuid;
-  }
-
-  public String getType() {
-    return type;
-  }
-
-  public void setType(String type) {
-    this.type = type;
-  }
-
-  public String getComponentUuid() {
-    return componentUuid;
-  }
-
-  public void setComponentUuid(String s) {
-    this.componentUuid = s;
-  }
-
-  public String getSubmitterLogin() {
-    return submitterLogin;
-  }
-
-  public void setSubmitterLogin(String s) {
-    this.submitterLogin = s;
-  }
-}
index 6b24dafdff9d0c6deed553c8a9dbfab45605dae8..a93fa71b4e53c5719f5080eab0d8c88805909d67 100644 (file)
@@ -45,7 +45,7 @@ public class ReportFiles {
     this.settings = settings;
   }
 
-  public void save(CeTaskSubmit taskSubmit, InputStream reportInput) {
+  public void save(TaskSubmission taskSubmit, InputStream reportInput) {
     File file = fileForUuid(taskSubmit.getUuid());
     try {
       FileUtils.copyInputStreamToFile(reportInput, file);
index 89c472b813a5ac7b91c32842b2c3e202de978535..de22bfda7504674c723de5f7489f29305218709c 100644 (file)
 package org.sonar.server.computation;
 
 import java.util.concurrent.TimeUnit;
-import org.sonar.api.platform.Server;
-import org.sonar.api.platform.ServerStartHandler;
 
 /**
  * Adds tasks to the Compute Engine to process batch reports.
  */
-public class ReportProcessingScheduler implements ServerStartHandler {
+public class ReportProcessingScheduler {
   private final ReportProcessingSchedulerExecutorService reportProcessingSchedulerExecutorService;
   private final ComputeEngineProcessingQueue processingQueue;
   private final CeWorker worker;
@@ -43,17 +41,12 @@ public class ReportProcessingScheduler implements ServerStartHandler {
     this.processingQueue = processingQueue;
     this.worker = worker;
 
-    this.delayBetweenTasks = 10;
+    this.delayBetweenTasks = 1;
     this.delayForFirstStart = 0;
     this.timeUnit = TimeUnit.SECONDS;
   }
 
-  public void startAnalysisTaskNow() {
-    reportProcessingSchedulerExecutorService.execute(new AddReportProcessingToCEProcessingQueue());
-  }
-
-  @Override
-  public void onServerStart(Server server) {
+  public void schedule() {
     reportProcessingSchedulerExecutorService.scheduleAtFixedRate(new AddReportProcessingToCEProcessingQueue(), delayForFirstStart, delayBetweenTasks, timeUnit);
   }
 
index fb9ecd576186d1e65691977efba3ca37d47842f8..85d12033446587fdac09ed9864d7315c3bc08db5 100644 (file)
@@ -22,7 +22,7 @@ package org.sonar.server.computation;
 import org.sonar.server.util.StoppableScheduledExecutorService;
 
 /**
- * ExecutorService responsible for adding {@link CeWorkerImpl} to {@link CeQueue} on a regular basis.
+ * ExecutorService responsible for adding {@link CeWorkerImpl} to {@link CeQueueImpl} on a regular basis.
  */
 public interface ReportProcessingSchedulerExecutorService extends StoppableScheduledExecutorService {
 }
index 6bb18ed6ae18924119470147b8f474a1b22f0d98..4ec0abf3be146ac9a48231de5b7ef38069679ebb 100644 (file)
@@ -63,7 +63,7 @@ public class ReportSubmitter {
     }
 
     // the report file must be saved before submitting the task
-    CeTaskSubmit submit = queue.prepareSubmit();
+    TaskSubmission submit = queue.prepareSubmit();
     reportFiles.save(submit, reportInput);
 
     submit.setType(CeTaskTypes.REPORT);
diff --git a/server/sonar-server/src/main/java/org/sonar/server/computation/TaskSubmission.java b/server/sonar-server/src/main/java/org/sonar/server/computation/TaskSubmission.java
new file mode 100644 (file)
index 0000000..801bba9
--- /dev/null
@@ -0,0 +1,43 @@
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+package org.sonar.server.computation;
+
+import javax.annotation.CheckForNull;
+import javax.annotation.Nullable;
+
+public interface TaskSubmission {
+
+  String getUuid();
+
+  String getType();
+
+  TaskSubmission setType(String s);
+
+  @CheckForNull
+  String getComponentUuid();
+
+  TaskSubmission setComponentUuid(@Nullable String s);
+
+  @CheckForNull
+  String getSubmitterLogin();
+
+  TaskSubmission setSubmitterLogin(@Nullable String s);
+
+}
index 700edb98ae9965de20c817a50a799083cca8b9a4..f10d7b4d0c4c7b3d267b6f0b00649f46e96135d7 100644 (file)
@@ -25,10 +25,12 @@ import org.sonar.api.server.ws.Request;
 import org.sonar.api.server.ws.Response;
 import org.sonar.api.server.ws.WebService;
 import org.sonar.api.web.UserRole;
+import org.sonar.core.util.Uuids;
 import org.sonar.db.DbClient;
 import org.sonar.db.DbSession;
 import org.sonar.db.ce.CeActivityDto;
 import org.sonar.db.ce.CeActivityQuery;
+import org.sonar.db.ce.CeTaskTypes;
 import org.sonar.server.user.UserSession;
 import org.sonar.server.ws.WsUtils;
 import org.sonarqube.ws.Common;
@@ -40,6 +42,11 @@ import org.sonarqube.ws.WsCe;
  */
 public class CeActivityWsAction implements CeWsAction {
 
+  private static final String PARAM_COMPONENT_ID = "componentId";
+  private static final String PARAM_TYPE = "type";
+  private static final String PARAM_STATUS = "status";
+  private static final String PARAM_ONLY_CURRENTS = "onlyCurrents";
+
   private final UserSession userSession;
   private final DbClient dbClient;
   private final CeWsTaskFormatter formatter;
@@ -52,32 +59,71 @@ public class CeActivityWsAction implements CeWsAction {
 
   @Override
   public void define(WebService.NewController controller) {
-    controller.createAction("activity")
+    WebService.NewAction action = controller.createAction("activity")
       .setInternal(true)
       .setResponseExample(getClass().getResource("CeActivityWsAction/example.json"))
       .setHandler(this);
+    action.createParam(PARAM_COMPONENT_ID)
+      .setDescription("Optional id of the component (project) to filter on")
+      .setExampleValue(Uuids.UUID_EXAMPLE_03);
+    action.createParam(PARAM_STATUS)
+      .setDescription("Optional filter on task status")
+      .setPossibleValues(CeActivityDto.Status.values());
+    action.createParam(PARAM_ONLY_CURRENTS)
+      .setDescription("Optional filter on the current activities (only the most recent task by project)")
+      .setBooleanPossibleValues()
+      .setDefaultValue("false");
+    action.createParam(PARAM_TYPE)
+      .setDescription("Optional filter on task type")
+      .setExampleValue(CeTaskTypes.REPORT);
+    action.addPagingParams(10);
   }
 
   @Override
   public void handle(Request wsRequest, Response wsResponse) throws Exception {
-    userSession.checkGlobalPermission(UserRole.ADMIN);
     DbSession dbSession = dbClient.openSession(false);
     try {
-      CeActivityQuery query = new CeActivityQuery();
-      List<CeActivityDto> dtos = dbClient.ceActivityDao().selectByQuery(dbSession, query, new RowBounds(0, 100));
+      CeActivityQuery query = readQuery(wsRequest);
+      RowBounds rowBounds = readMyBatisRowBounds(wsRequest);
+      List<CeActivityDto> dtos = dbClient.ceActivityDao().selectByQuery(dbSession, query, rowBounds);
+      int total = dbClient.ceActivityDao().countByQuery(dbSession, query);
 
       WsCe.ActivityResponse.Builder wsResponseBuilder = WsCe.ActivityResponse.newBuilder();
       wsResponseBuilder.addAllTasks(formatter.formatActivity(dbSession, dtos));
-      Common.Paging paging = Common.Paging.newBuilder()
-        .setPageIndex(1)
-        .setPageSize(dtos.size())
-        .setTotal(dtos.size())
-        .build();
-      wsResponseBuilder.setPaging(paging);
+      wsResponseBuilder.setPaging(Common.Paging.newBuilder()
+        .setPageIndex(wsRequest.mandatoryParamAsInt(WebService.Param.PAGE))
+        .setPageSize(wsRequest.mandatoryParamAsInt(WebService.Param.PAGE_SIZE))
+        .setTotal(total));
       WsUtils.writeProtobuf(wsResponseBuilder.build(), wsRequest, wsResponse);
 
     } finally {
       dbClient.closeSession(dbSession);
     }
   }
+
+  private CeActivityQuery readQuery(Request wsRequest) {
+    CeActivityQuery query = new CeActivityQuery();
+    query.setType(wsRequest.param(PARAM_TYPE));
+    query.setOnlyCurrents(wsRequest.mandatoryParamAsBoolean(PARAM_ONLY_CURRENTS));
+      
+    String status = wsRequest.param(PARAM_STATUS);
+    if (status != null) {
+      query.setStatus(CeActivityDto.Status.valueOf(status));
+    }
+    
+    String componentId = wsRequest.param(PARAM_COMPONENT_ID);
+    if (componentId == null) {
+      userSession.checkGlobalPermission(UserRole.ADMIN);
+    } else {
+      userSession.checkProjectUuidPermission(UserRole.ADMIN, componentId);
+      query.setComponentUuid(componentId);
+    }
+    return query;
+  }
+
+  private static RowBounds readMyBatisRowBounds(Request wsRequest) {
+    int pageIndex = wsRequest.mandatoryParamAsInt(WebService.Param.PAGE);
+    int pageSize = wsRequest.mandatoryParamAsInt(WebService.Param.PAGE_SIZE);
+    return new RowBounds((pageIndex - 1) * pageSize, pageSize);
+  }
 }
index d230c2d0cbbc78a3436fdb88b4d62cc571664d36..2c26dd970f43e0dc88c46950060073ab18c7de0b 100644 (file)
  */
 package org.sonar.server.computation.ws;
 
-import com.google.common.base.Strings;
 import java.io.InputStream;
 import org.apache.commons.lang.StringUtils;
 import org.sonar.api.server.ws.Request;
 import org.sonar.api.server.ws.Response;
 import org.sonar.api.server.ws.WebService;
 import org.sonar.server.computation.CeTask;
-import org.sonar.server.computation.ReportProcessingScheduler;
 import org.sonar.server.computation.ReportSubmitter;
 import org.sonar.server.ws.WsUtils;
 import org.sonarqube.ws.WsCe;
@@ -43,11 +41,9 @@ public class CeSubmitWsAction implements CeWsAction {
   public static final String PARAM_REPORT_DATA = "report";
 
   private final ReportSubmitter reportSubmitter;
-  private final ReportProcessingScheduler reportProcessingScheduler;
 
-  public CeSubmitWsAction(ReportSubmitter reportSubmitter, ReportProcessingScheduler reportProcessingScheduler) {
+  public CeSubmitWsAction(ReportSubmitter reportSubmitter) {
     this.reportSubmitter = reportSubmitter;
-    this.reportProcessingScheduler = reportProcessingScheduler;
   }
 
   @Override
@@ -85,12 +81,11 @@ public class CeSubmitWsAction implements CeWsAction {
   @Override
   public void handle(Request wsRequest, Response wsResponse) throws Exception {
     String projectKey = wsRequest.mandatoryParam(PARAM_PROJECT_KEY);
-    String projectBranch = Strings.emptyToNull(wsRequest.param(PARAM_PROJECT_BRANCH));
+    String projectBranch = wsRequest.param(PARAM_PROJECT_BRANCH);
     String projectName = StringUtils.defaultIfBlank(wsRequest.param(PARAM_PROJECT_NAME), projectKey);
     InputStream reportInput = wsRequest.paramAsInputStream(PARAM_REPORT_DATA);
 
     CeTask task = reportSubmitter.submit(projectKey, projectBranch, projectName, reportInput);
-    reportProcessingScheduler.startAnalysisTaskNow();
 
     WsCe.SubmitResponse submitResponse = WsCe.SubmitResponse.newBuilder()
       .setTaskId(task.getUuid())
index 0fa4ccab8ad036166a6846da6879b1e511908d90..0b06121221f09432e9b9925531e6ee2b44ac7f22 100644 (file)
@@ -64,7 +64,8 @@ import org.sonar.server.component.DefaultRubyComponentService;
 import org.sonar.server.component.ws.ComponentsWs;
 import org.sonar.server.component.ws.EventsWs;
 import org.sonar.server.component.ws.ResourcesWs;
-import org.sonar.server.computation.CeQueue;
+import org.sonar.server.computation.CeQueueImpl;
+import org.sonar.server.computation.CeQueueCleaner;
 import org.sonar.server.computation.CeQueueInitializer;
 import org.sonar.server.computation.CleanReportQueueListener;
 import org.sonar.server.computation.ComputeEngineProcessingModule;
@@ -718,7 +719,7 @@ public class PlatformLevel4 extends PlatformLevel {
       // Compute engine
       CEQueueStatusImpl.class,
       ComputeEngineQueueMonitor.class,
-      CeQueue.class,
+      CeQueueImpl.class,
       CleanReportQueueListener.class,
       ReportFiles.class,
       ComputeEngineProcessingModule.class,
@@ -734,6 +735,7 @@ public class PlatformLevel4 extends PlatformLevel {
       ProjectSettingsFactory.class,
       IndexPurgeListener.class,
       ReportSubmitter.class,
+      CeQueueCleaner.class,
       CeQueueInitializer.class,
       // Views plugin
       ViewsBootstrap.class,
index 059ab25bc277d01f3bc18253d08a60fe28d5915f..9854065033f76a673b58a8b30c116ec462d935bd 100644 (file)
@@ -52,7 +52,7 @@ public abstract class AbstractStoppableExecutorService<T extends ExecutorService
       if (!delegate.awaitTermination(5, TimeUnit.SECONDS)) {
         // Cancel currently executing tasks
         delegate.shutdownNow();
-        // Wait a while for tasks to respond to being cancelled
+        // Wait a while for tasks to respond to being canceled
         if (!delegate.awaitTermination(5, TimeUnit.SECONDS)) {
           Loggers.get(getClass()).error(format("Pool %s did not terminate", getClass().getSimpleName()));
         }
index 733cd6232f3ae425aeb29168e25c1ce5fe55b691..b7b3b2700794056668fd02ed640280e3b6a4a929 100644 (file)
@@ -1,4 +1,9 @@
 {
+  "paging": {
+    "pageIndex": 1,
+    "pageSize": 10,
+    "total": 233
+  },
   "tasks": [
     {
       "id": "BU_dO1vsORa8_beWCwsP",
diff --git a/server/sonar-server/src/test/java/org/sonar/server/computation/CeQueueCleanerTest.java b/server/sonar-server/src/test/java/org/sonar/server/computation/CeQueueCleanerTest.java
new file mode 100644 (file)
index 0000000..db5786d
--- /dev/null
@@ -0,0 +1,116 @@
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+package org.sonar.server.computation;
+
+import java.io.File;
+import java.io.IOException;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+import org.sonar.api.platform.ServerUpgradeStatus;
+import org.sonar.api.utils.System2;
+import org.sonar.db.DbSession;
+import org.sonar.db.DbTester;
+import org.sonar.db.ce.CeQueueDto;
+import org.sonar.db.ce.CeTaskTypes;
+
+import static java.util.Arrays.asList;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class CeQueueCleanerTest {
+
+  @Rule
+  public DbTester dbTester = DbTester.create(System2.INSTANCE);
+
+  @Rule
+  public TemporaryFolder tempFolder = new TemporaryFolder();
+
+  ServerUpgradeStatus serverUpgradeStatus = mock(ServerUpgradeStatus.class);
+  ReportFiles reportFiles = mock(ReportFiles.class, Mockito.RETURNS_DEEP_STUBS);
+  CeQueueImpl queue = mock(CeQueueImpl.class);
+  CeQueueCleaner underTest = new CeQueueCleaner(dbTester.getDbClient(), serverUpgradeStatus, reportFiles, queue);
+
+  @Test
+  public void reset_in_progress_tasks_to_pending() throws IOException {
+    insertInQueue("TASK_1", CeQueueDto.Status.PENDING);
+    insertInQueue("TASK_2", CeQueueDto.Status.IN_PROGRESS);
+
+    underTest.clean(dbTester.getSession());
+
+    assertThat(dbTester.getDbClient().ceQueueDao().countByStatus(dbTester.getSession(), CeQueueDto.Status.PENDING)).isEqualTo(2);
+    assertThat(dbTester.getDbClient().ceQueueDao().countByStatus(dbTester.getSession(), CeQueueDto.Status.IN_PROGRESS)).isEqualTo(0);
+  }
+
+  @Test
+  public void clear_queue_if_version_upgrade() {
+    when(serverUpgradeStatus.isUpgraded()).thenReturn(true);
+
+    underTest.clean(dbTester.getSession());
+
+    verify(queue).clear();
+  }
+
+  @Test
+  public void cancel_task_if_report_file_is_missing() throws IOException {
+    CeQueueDto task = insertInQueue("TASK_1", CeQueueDto.Status.PENDING, false);
+
+    underTest.clean(dbTester.getSession());
+
+    verify(queue).cancel(any(DbSession.class), eq(task));
+  }
+
+  @Test
+  public void delete_orphan_report_files() throws Exception {
+    // two files on disk but on task in queue
+    insertInQueue("TASK_1", CeQueueDto.Status.PENDING, true);
+    when(reportFiles.listUuids()).thenReturn(asList("TASK_1", "TASK_2"));
+
+    underTest.clean(dbTester.getSession());
+
+    verify(reportFiles).deleteIfExists("TASK_2");
+  }
+
+  private void insertInQueue(String taskUuid, CeQueueDto.Status status) throws IOException {
+    insertInQueue(taskUuid, status, true);
+  }
+
+  private CeQueueDto insertInQueue(String taskUuid, CeQueueDto.Status status, boolean createFile) throws IOException {
+    CeQueueDto queueDto = new CeQueueDto();
+    queueDto.setTaskType(CeTaskTypes.REPORT);
+    queueDto.setComponentUuid("PROJECT_1");
+    queueDto.setUuid(taskUuid);
+    queueDto.setStatus(status);
+    dbTester.getDbClient().ceQueueDao().insert(dbTester.getSession(), queueDto);
+    dbTester.getSession().commit();
+
+    File file = tempFolder.newFile();
+    when(reportFiles.fileForUuid(taskUuid)).thenReturn(file);
+    if (!createFile) {
+      file.delete();
+    }
+    return queueDto;
+  }
+}
diff --git a/server/sonar-server/src/test/java/org/sonar/server/computation/CeQueueImplTest.java b/server/sonar-server/src/test/java/org/sonar/server/computation/CeQueueImplTest.java
new file mode 100644 (file)
index 0000000..a7b378a
--- /dev/null
@@ -0,0 +1,219 @@
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+package org.sonar.server.computation;
+
+import com.google.common.base.Optional;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.sonar.api.utils.System2;
+import org.sonar.api.utils.internal.TestSystem2;
+import org.sonar.core.util.UuidFactory;
+import org.sonar.core.util.UuidFactoryImpl;
+import org.sonar.db.DbTester;
+import org.sonar.db.ce.CeActivityDto;
+import org.sonar.db.ce.CeQueueDto;
+import org.sonar.db.ce.CeTaskTypes;
+import org.sonar.server.computation.monitoring.CEQueueStatus;
+import org.sonar.server.computation.monitoring.CEQueueStatusImpl;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.hamcrest.Matchers.startsWith;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+
+public class CeQueueImplTest {
+
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
+  System2 system2 = new TestSystem2().setNow(1_450_000_000_000L);
+
+  @Rule
+  public DbTester dbTester = DbTester.create(system2);
+
+  UuidFactory uuidFactory = UuidFactoryImpl.INSTANCE;
+  CEQueueStatus queueStatus = new CEQueueStatusImpl();
+  CeQueueListener listener = mock(CeQueueListener.class);
+  CeQueue underTest = new CeQueueImpl(system2, dbTester.getDbClient(), uuidFactory, queueStatus, new CeQueueListener[] {listener});
+
+  @Test
+  public void test_submit() {
+    TaskSubmission submission = underTest.prepareSubmit();
+    submission.setComponentUuid("PROJECT_1");
+    submission.setType(CeTaskTypes.REPORT);
+    submission.setSubmitterLogin("rob");
+
+    CeTask task = underTest.submit(submission);
+    assertThat(task.getUuid()).isEqualTo(submission.getUuid());
+    assertThat(task.getComponentUuid()).isEqualTo("PROJECT_1");
+    assertThat(task.getSubmitterLogin()).isEqualTo("rob");
+
+    Optional<CeQueueDto> queueDto = dbTester.getDbClient().ceQueueDao().selectByUuid(dbTester.getSession(), submission.getUuid());
+    assertThat(queueDto.isPresent()).isTrue();
+    assertThat(queueDto.get().getTaskType()).isEqualTo(CeTaskTypes.REPORT);
+    assertThat(queueDto.get().getComponentUuid()).isEqualTo("PROJECT_1");
+    assertThat(queueDto.get().getSubmitterLogin()).isEqualTo("rob");
+    assertThat(queueDto.get().getCreatedAt()).isEqualTo(1_450_000_000_000L);
+    assertThat(queueStatus.getReceivedCount()).isEqualTo(1L);
+  }
+
+  @Test
+  public void fail_to_submit_if_paused() {
+    expectedException.expect(IllegalStateException.class);
+    expectedException.expectMessage("Compute Engine does not currently accept new tasks");
+    underTest.pauseSubmit();
+
+    submit(CeTaskTypes.REPORT, "PROJECT_1");
+  }
+
+  @Test
+  public void test_remove() {
+    CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1");
+    Optional<CeTask> peek = underTest.peek();
+    underTest.remove(peek.get(), CeActivityDto.Status.SUCCESS);
+
+    // queue is empty
+    assertThat(dbTester.getDbClient().ceQueueDao().selectByUuid(dbTester.getSession(), task.getUuid()).isPresent()).isFalse();
+    assertThat(underTest.peek().isPresent()).isFalse();
+
+    // available in history
+    Optional<CeActivityDto> history = dbTester.getDbClient().ceActivityDao().selectByUuid(dbTester.getSession(), task.getUuid());
+    assertThat(history.isPresent()).isTrue();
+    assertThat(history.get().getStatus()).isEqualTo(CeActivityDto.Status.SUCCESS);
+    assertThat(history.get().getIsLast()).isTrue();
+
+    verify(listener).onRemoved(task, CeActivityDto.Status.SUCCESS);
+  }
+
+  @Test
+  public void fail_to_remove_if_not_in_queue() throws Exception {
+    expectedException.expect(IllegalStateException.class);
+    CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1");
+    underTest.remove(task, CeActivityDto.Status.SUCCESS);
+
+    // fail
+    underTest.remove(task, CeActivityDto.Status.SUCCESS);
+  }
+
+  @Test
+  public void test_peek() throws Exception {
+    CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1");
+
+    Optional<CeTask> peek = underTest.peek();
+    assertThat(peek.isPresent()).isTrue();
+    assertThat(peek.get().getUuid()).isEqualTo(task.getUuid());
+    assertThat(peek.get().getType()).isEqualTo(CeTaskTypes.REPORT);
+    assertThat(peek.get().getComponentUuid()).isEqualTo("PROJECT_1");
+
+    // no more pending tasks
+    peek = underTest.peek();
+    assertThat(peek.isPresent()).isFalse();
+
+    verify(listener, never()).onRemoved(eq(task), any(CeActivityDto.Status.class));
+  }
+
+  @Test
+  public void peek_nothing_if_paused() throws Exception {
+    submit(CeTaskTypes.REPORT, "PROJECT_1");
+    underTest.pausePeek();
+
+    Optional<CeTask> peek = underTest.peek();
+    assertThat(peek.isPresent()).isFalse();
+  }
+
+  @Test
+  public void cancel_pending() throws Exception {
+    CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1");
+
+    // ignore
+    boolean canceled = underTest.cancel("UNKNOWN");
+    assertThat(canceled).isFalse();
+    verifyZeroInteractions(listener);
+
+    canceled = underTest.cancel(task.getUuid());
+    assertThat(canceled).isTrue();
+    Optional<CeActivityDto> activity = dbTester.getDbClient().ceActivityDao().selectByUuid(dbTester.getSession(), task.getUuid());
+    assertThat(activity.isPresent()).isTrue();
+    assertThat(activity.get().getStatus()).isEqualTo(CeActivityDto.Status.CANCELED);
+    verify(listener).onRemoved(task, CeActivityDto.Status.CANCELED);
+  }
+
+  @Test
+  public void fail_to_cancel_if_in_progress() throws Exception {
+    expectedException.expect(IllegalStateException.class);
+    expectedException.expectMessage(startsWith("Task is in progress and can't be canceled"));
+
+    CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1");
+    underTest.peek();
+
+    underTest.cancel(task.getUuid());
+  }
+
+  @Test
+  public void cancelAll_pendings_but_not_in_progress() throws Exception {
+    CeTask inProgressTask = submit(CeTaskTypes.REPORT, "PROJECT_1");
+    CeTask pendingTask1 = submit(CeTaskTypes.REPORT, "PROJECT_2");
+    CeTask pendingTask2 = submit(CeTaskTypes.REPORT, "PROJECT_3");
+    underTest.peek();
+
+    int canceledCount = underTest.cancelAll();
+    assertThat(canceledCount).isEqualTo(2);
+
+    Optional<CeActivityDto> history = dbTester.getDbClient().ceActivityDao().selectByUuid(dbTester.getSession(), pendingTask1.getUuid());
+    assertThat(history.get().getStatus()).isEqualTo(CeActivityDto.Status.CANCELED);
+    history = dbTester.getDbClient().ceActivityDao().selectByUuid(dbTester.getSession(), pendingTask2.getUuid());
+    assertThat(history.get().getStatus()).isEqualTo(CeActivityDto.Status.CANCELED);
+    history = dbTester.getDbClient().ceActivityDao().selectByUuid(dbTester.getSession(), inProgressTask.getUuid());
+    assertThat(history.isPresent()).isFalse();
+
+    verify(listener).onRemoved(pendingTask1, CeActivityDto.Status.CANCELED);
+    verify(listener).onRemoved(pendingTask2, CeActivityDto.Status.CANCELED);
+  }
+
+  @Test
+  public void pause_and_resume_submits() throws Exception {
+    assertThat(underTest.isSubmitPaused()).isFalse();
+    underTest.pauseSubmit();
+    assertThat(underTest.isSubmitPaused()).isTrue();
+    underTest.resumeSubmit();
+    assertThat(underTest.isSubmitPaused()).isFalse();
+  }
+
+  @Test
+  public void pause_and_resume_peeks() throws Exception {
+    assertThat(underTest.isPeekPaused()).isFalse();
+    underTest.pausePeek();
+    assertThat(underTest.isPeekPaused()).isTrue();
+    underTest.resumePeek();
+    assertThat(underTest.isPeekPaused()).isFalse();
+  }
+
+  private CeTask submit(String reportType, String componentUuid) {
+    TaskSubmission submission = underTest.prepareSubmit();
+    submission.setType(reportType);
+    submission.setComponentUuid(componentUuid);
+    return underTest.submit(submission);
+  }
+}
index 046da85ea64c888dded7de2712acf34d75f36426..495c7eebda7e2f5d237d68985e991747d32eca91 100644 (file)
@@ -24,8 +24,8 @@ import java.io.IOException;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.mockito.InOrder;
 import org.mockito.Mockito;
-import org.sonar.api.platform.ServerUpgradeStatus;
 import org.sonar.api.utils.System2;
 import org.sonar.db.DbSession;
 import org.sonar.db.DbTester;
@@ -34,12 +34,9 @@ import org.sonar.db.ce.CeTaskTypes;
 import org.sonar.server.computation.monitoring.CEQueueStatus;
 import org.sonar.server.computation.monitoring.CEQueueStatusImpl;
 
-import static java.util.Arrays.asList;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class CeQueueInitializerTest {
@@ -50,11 +47,11 @@ public class CeQueueInitializerTest {
   @Rule
   public TemporaryFolder tempFolder = new TemporaryFolder();
 
-  ServerUpgradeStatus serverUpgradeStatus = mock(ServerUpgradeStatus.class);
   ReportFiles reportFiles = mock(ReportFiles.class, Mockito.RETURNS_DEEP_STUBS);
-  CeQueue queue = mock(CeQueue.class);
   CEQueueStatus queueStatus = new CEQueueStatusImpl();
-  CeQueueInitializer underTest = new CeQueueInitializer(dbTester.getDbClient(), serverUpgradeStatus, reportFiles, queue, queueStatus);
+  CeQueueCleaner cleaner = mock(CeQueueCleaner.class);
+  ReportProcessingScheduler scheduler = mock(ReportProcessingScheduler.class);
+  CeQueueInitializer underTest = new CeQueueInitializer(dbTester.getDbClient(), queueStatus, cleaner, scheduler);
 
   @Test
   public void init_jmx_counters() throws IOException {
@@ -76,43 +73,13 @@ public class CeQueueInitializerTest {
   }
 
   @Test
-  public void reset_in_progress_tasks_to_pending() throws IOException {
-    insertInQueue("TASK_1", CeQueueDto.Status.PENDING);
-    insertInQueue("TASK_2", CeQueueDto.Status.IN_PROGRESS);
-
-    underTest.start();
-
-    assertThat(dbTester.getDbClient().ceQueueDao().countByStatus(dbTester.getSession(), CeQueueDto.Status.PENDING)).isEqualTo(2);
-    assertThat(dbTester.getDbClient().ceQueueDao().countByStatus(dbTester.getSession(), CeQueueDto.Status.IN_PROGRESS)).isEqualTo(0);
-  }
-
-  @Test
-  public void clear_queue_if_version_upgrade() {
-    when(serverUpgradeStatus.isUpgraded()).thenReturn(true);
-
-    underTest.start();
-
-    verify(queue).clear();
-  }
-
-  @Test
-  public void cancel_task_if_report_file_is_missing() throws IOException {
-    CeQueueDto task = insertInQueue("TASK_1", CeQueueDto.Status.PENDING, false);
-
-    underTest.start();
-
-    verify(queue).cancel(any(DbSession.class), eq(task));
-  }
-
-  @Test
-  public void delete_orphan_report_files() throws Exception {
-    // two files on disk but on task in queue
-    insertInQueue("TASK_1", CeQueueDto.Status.PENDING, true);
-    when(reportFiles.listUuids()).thenReturn(asList("TASK_1", "TASK_2"));
+  public void clean_queue_then_start_scheduler_of_workers() throws IOException {
+    InOrder inOrder = Mockito.inOrder(cleaner, scheduler);
 
     underTest.start();
 
-    verify(reportFiles).deleteIfExists("TASK_2");
+    inOrder.verify(cleaner).clean(any(DbSession.class));
+    inOrder.verify(scheduler).schedule();
   }
 
   private void insertInQueue(String taskUuid, CeQueueDto.Status status) throws IOException {
diff --git a/server/sonar-server/src/test/java/org/sonar/server/computation/CeQueueTest.java b/server/sonar-server/src/test/java/org/sonar/server/computation/CeQueueTest.java
deleted file mode 100644 (file)
index 0f0939c..0000000
+++ /dev/null
@@ -1,219 +0,0 @@
-/*
- * SonarQube, open source software quality management tool.
- * Copyright (C) 2008-2014 SonarSource
- * mailto:contact AT sonarsource DOT com
- *
- * SonarQube is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * SonarQube is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
- */
-package org.sonar.server.computation;
-
-import com.google.common.base.Optional;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.sonar.api.utils.System2;
-import org.sonar.api.utils.internal.TestSystem2;
-import org.sonar.core.util.UuidFactory;
-import org.sonar.core.util.UuidFactoryImpl;
-import org.sonar.db.DbTester;
-import org.sonar.db.ce.CeActivityDto;
-import org.sonar.db.ce.CeQueueDto;
-import org.sonar.db.ce.CeTaskTypes;
-import org.sonar.server.computation.monitoring.CEQueueStatus;
-import org.sonar.server.computation.monitoring.CEQueueStatusImpl;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.hamcrest.Matchers.startsWith;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyZeroInteractions;
-
-public class CeQueueTest {
-
-  @Rule
-  public ExpectedException expectedException = ExpectedException.none();
-
-  System2 system2 = new TestSystem2().setNow(1_450_000_000_000L);
-
-  @Rule
-  public DbTester dbTester = DbTester.create(system2);
-
-  UuidFactory uuidFactory = UuidFactoryImpl.INSTANCE;
-  CEQueueStatus queueStatus = new CEQueueStatusImpl();
-  CeQueueListener listener = mock(CeQueueListener.class);
-  CeQueue underTest = new CeQueue(system2, dbTester.getDbClient(), uuidFactory, queueStatus, new CeQueueListener[] {listener});
-
-  @Test
-  public void test_submit() {
-    CeTaskSubmit submit = underTest.prepareSubmit();
-    submit.setComponentUuid("PROJECT_1");
-    submit.setType(CeTaskTypes.REPORT);
-    submit.setSubmitterLogin("rob");
-
-    CeTask task = underTest.submit(submit);
-    assertThat(task.getUuid()).isEqualTo(submit.getUuid());
-    assertThat(task.getComponentUuid()).isEqualTo("PROJECT_1");
-    assertThat(task.getSubmitterLogin()).isEqualTo("rob");
-
-    Optional<CeQueueDto> queueDto = dbTester.getDbClient().ceQueueDao().selectByUuid(dbTester.getSession(), submit.getUuid());
-    assertThat(queueDto.isPresent()).isTrue();
-    assertThat(queueDto.get().getTaskType()).isEqualTo(CeTaskTypes.REPORT);
-    assertThat(queueDto.get().getComponentUuid()).isEqualTo("PROJECT_1");
-    assertThat(queueDto.get().getSubmitterLogin()).isEqualTo("rob");
-    assertThat(queueDto.get().getCreatedAt()).isEqualTo(1_450_000_000_000L);
-    assertThat(queueStatus.getReceivedCount()).isEqualTo(1L);
-  }
-
-  @Test
-  public void fail_to_submit_if_paused() {
-    expectedException.expect(IllegalStateException.class);
-    expectedException.expectMessage("Compute Engine does not currently accept new tasks");
-    underTest.pauseSubmit();
-
-    submit(CeTaskTypes.REPORT, "PROJECT_1");
-  }
-
-  @Test
-  public void test_remove() {
-    CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1");
-    Optional<CeTask> peek = underTest.peek();
-    underTest.remove(peek.get(), CeActivityDto.Status.SUCCESS);
-
-    // queue is empty
-    assertThat(dbTester.getDbClient().ceQueueDao().selectByUuid(dbTester.getSession(), task.getUuid()).isPresent()).isFalse();
-    assertThat(underTest.peek().isPresent()).isFalse();
-
-    // available in history
-    Optional<CeActivityDto> history = dbTester.getDbClient().ceActivityDao().selectByUuid(dbTester.getSession(), task.getUuid());
-    assertThat(history.isPresent()).isTrue();
-    assertThat(history.get().getStatus()).isEqualTo(CeActivityDto.Status.SUCCESS);
-    assertThat(history.get().getIsLast()).isTrue();
-
-    verify(listener).onRemoved(task, CeActivityDto.Status.SUCCESS);
-  }
-
-  @Test
-  public void fail_to_remove_if_not_in_queue() throws Exception {
-    expectedException.expect(IllegalStateException.class);
-    CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1");
-    underTest.remove(task, CeActivityDto.Status.SUCCESS);
-
-    // fail
-    underTest.remove(task, CeActivityDto.Status.SUCCESS);
-  }
-
-  @Test
-  public void test_peek() throws Exception {
-    CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1");
-
-    Optional<CeTask> peek = underTest.peek();
-    assertThat(peek.isPresent()).isTrue();
-    assertThat(peek.get().getUuid()).isEqualTo(task.getUuid());
-    assertThat(peek.get().getType()).isEqualTo(CeTaskTypes.REPORT);
-    assertThat(peek.get().getComponentUuid()).isEqualTo("PROJECT_1");
-
-    // no more pending tasks
-    peek = underTest.peek();
-    assertThat(peek.isPresent()).isFalse();
-
-    verify(listener, never()).onRemoved(eq(task), any(CeActivityDto.Status.class));
-  }
-
-  @Test
-  public void peek_nothing_if_paused() throws Exception {
-    submit(CeTaskTypes.REPORT, "PROJECT_1");
-    underTest.pausePeek();
-
-    Optional<CeTask> peek = underTest.peek();
-    assertThat(peek.isPresent()).isFalse();
-  }
-
-  @Test
-  public void cancel_pending() throws Exception {
-    CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1");
-
-    // ignore
-    boolean canceled = underTest.cancel("UNKNOWN");
-    assertThat(canceled).isFalse();
-    verifyZeroInteractions(listener);
-
-    canceled = underTest.cancel(task.getUuid());
-    assertThat(canceled).isTrue();
-    Optional<CeActivityDto> activity = dbTester.getDbClient().ceActivityDao().selectByUuid(dbTester.getSession(), task.getUuid());
-    assertThat(activity.isPresent()).isTrue();
-    assertThat(activity.get().getStatus()).isEqualTo(CeActivityDto.Status.CANCELED);
-    verify(listener).onRemoved(task, CeActivityDto.Status.CANCELED);
-  }
-
-  @Test
-  public void fail_to_cancel_if_in_progress() throws Exception {
-    expectedException.expect(IllegalStateException.class);
-    expectedException.expectMessage(startsWith("Task is in progress and can't be cancelled"));
-
-    CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1");
-    underTest.peek();
-
-    underTest.cancel(task.getUuid());
-  }
-
-  @Test
-  public void cancelAll_pendings_but_not_in_progress() throws Exception {
-    CeTask inProgressTask = submit(CeTaskTypes.REPORT, "PROJECT_1");
-    CeTask pendingTask1 = submit(CeTaskTypes.REPORT, "PROJECT_2");
-    CeTask pendingTask2 = submit(CeTaskTypes.REPORT, "PROJECT_3");
-    underTest.peek();
-
-    int canceledCount = underTest.cancelAll();
-    assertThat(canceledCount).isEqualTo(2);
-
-    Optional<CeActivityDto> history = dbTester.getDbClient().ceActivityDao().selectByUuid(dbTester.getSession(), pendingTask1.getUuid());
-    assertThat(history.get().getStatus()).isEqualTo(CeActivityDto.Status.CANCELED);
-    history = dbTester.getDbClient().ceActivityDao().selectByUuid(dbTester.getSession(), pendingTask2.getUuid());
-    assertThat(history.get().getStatus()).isEqualTo(CeActivityDto.Status.CANCELED);
-    history = dbTester.getDbClient().ceActivityDao().selectByUuid(dbTester.getSession(), inProgressTask.getUuid());
-    assertThat(history.isPresent()).isFalse();
-
-    verify(listener).onRemoved(pendingTask1, CeActivityDto.Status.CANCELED);
-    verify(listener).onRemoved(pendingTask2, CeActivityDto.Status.CANCELED);
-  }
-
-  @Test
-  public void pause_and_resume_submits() throws Exception {
-    assertThat(underTest.isSubmitPaused()).isFalse();
-    underTest.pauseSubmit();
-    assertThat(underTest.isSubmitPaused()).isTrue();
-    underTest.resumeSubmit();
-    assertThat(underTest.isSubmitPaused()).isFalse();
-  }
-
-  @Test
-  public void pause_and_resume_peeks() throws Exception {
-    assertThat(underTest.isPeekPaused()).isFalse();
-    underTest.pausePeek();
-    assertThat(underTest.isPeekPaused()).isTrue();
-    underTest.resumePeek();
-    assertThat(underTest.isPeekPaused()).isFalse();
-  }
-
-  private CeTask submit(String reportType, String componentUuid) {
-    CeTaskSubmit submit = underTest.prepareSubmit();
-    submit.setType(reportType);
-    submit.setComponentUuid(componentUuid);
-    return underTest.submit(submit);
-  }
-}
index aed8a4a450aa1908963a01487d078d4444263b1c..cc33b0d81e8ae42eb814f5963c369e6a30d11748 100644 (file)
@@ -26,15 +26,13 @@ import org.sonar.db.ce.CeTaskTypes;
 
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.verifyZeroInteractions;
 import static org.mockito.Mockito.when;
 
 public class CeWorkerImplTest {
 
-  CeQueue queue = mock(CeQueue.class);
+  CeQueue queue = mock(CeQueueImpl.class);
   ReportTaskProcessor taskProcessor = mock(ReportTaskProcessor.class);
   CeWorker underTest = new CeWorkerImpl(queue, taskProcessor);
 
index 51cc29f5469095cc558a5d46bdcb6db3f2d387ea..74bbd25f030361673aada30d0761cc9a3696a04d 100644 (file)
@@ -26,12 +26,10 @@ import java.util.concurrent.TimeUnit;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
-import org.sonar.api.platform.Server;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -44,20 +42,10 @@ public class ReportProcessingSchedulerTest {
 
   @Test
   public void schedule_at_fixed_rate_adding_a_ReportProcessingTask_to_the_queue() throws Exception {
-    when(batchExecutorService.scheduleAtFixedRate(any(Runnable.class), eq(0L), eq(10L), eq(TimeUnit.SECONDS)))
+    when(batchExecutorService.scheduleAtFixedRate(any(Runnable.class), eq(0L), eq(1L), eq(TimeUnit.SECONDS)))
       .thenAnswer(new ExecuteFirstArgAsRunnable());
 
-    underTest.onServerStart(mock(Server.class));
-
-    assertThat(processingQueue.getTasks()).hasSize(1);
-    assertThat(processingQueue.getTasks().iterator().next()).isInstanceOf(CeWorker.class);
-  }
-
-  @Test
-  public void adds_immediately_a_ReportProcessingTask_to_the_queue() throws Exception {
-    doAnswer(new ExecuteFirstArgAsRunnable()).when(batchExecutorService).execute(any(Runnable.class));
-
-    underTest.startAnalysisTaskNow();
+    underTest.schedule();
 
     assertThat(processingQueue.getTasks()).hasSize(1);
     assertThat(processingQueue.getTasks().iterator().next()).isInstanceOf(CeWorker.class);
index 856c691211ec77ede200d0ee692c0063c1828554..88b88f086b34863decab0fe8357acb8bcbba898d 100644 (file)
@@ -42,22 +42,22 @@ public class ReportSubmitterTest {
   @Rule
   public UserSessionRule userSession = UserSessionRule.standalone();
 
-  CeQueue queue = mock(CeQueue.class);
+  CeQueue queue = mock(CeQueueImpl.class);
   ReportFiles reportFiles = mock(ReportFiles.class);
   ComponentService componentService = mock(ComponentService.class);
   ReportSubmitter underTest = new ReportSubmitter(queue, userSession, reportFiles, componentService);
 
   @Test
   public void submit_a_report_on_existing_project() {
-    when(queue.prepareSubmit()).thenReturn(new CeTaskSubmit("TASK_1"));
+    when(queue.prepareSubmit()).thenReturn(new TestTaskSubmission("TASK_1"));
     userSession.setGlobalPermissions(GlobalPermissions.SCAN_EXECUTION);
     when(componentService.getNullableByKey("MY_PROJECT")).thenReturn(new ComponentDto().setUuid("P1"));
 
     underTest.submit("MY_PROJECT", null, "My Project", IOUtils.toInputStream("{binary}"));
 
-    verify(queue).submit(argThat(new TypeSafeMatcher<CeTaskSubmit>() {
+    verify(queue).submit(argThat(new TypeSafeMatcher<TaskSubmission>() {
       @Override
-      protected boolean matchesSafely(CeTaskSubmit submit) {
+      protected boolean matchesSafely(TaskSubmission submit) {
         return submit.getType().equals(CeTaskTypes.REPORT) && submit.getComponentUuid().equals("P1") &&
           submit.getUuid().equals("TASK_1");
       }
@@ -71,16 +71,16 @@ public class ReportSubmitterTest {
 
   @Test
   public void provision_project_if_does_not_exist() throws Exception {
-    when(queue.prepareSubmit()).thenReturn(new CeTaskSubmit("TASK_1"));
+    when(queue.prepareSubmit()).thenReturn(new TestTaskSubmission("TASK_1"));
     userSession.setGlobalPermissions(GlobalPermissions.SCAN_EXECUTION, GlobalPermissions.PROVISIONING);
     when(componentService.getNullableByKey("MY_PROJECT")).thenReturn(null);
     when(componentService.create(any(NewComponent.class))).thenReturn(new ComponentDto().setUuid("P1"));
 
     underTest.submit("MY_PROJECT", null, "My Project", IOUtils.toInputStream("{binary}"));
 
-    verify(queue).submit(argThat(new TypeSafeMatcher<CeTaskSubmit>() {
+    verify(queue).submit(argThat(new TypeSafeMatcher<TaskSubmission>() {
       @Override
-      protected boolean matchesSafely(CeTaskSubmit submit) {
+      protected boolean matchesSafely(TaskSubmission submit) {
         return submit.getType().equals(CeTaskTypes.REPORT) && submit.getComponentUuid().equals("P1") &&
           submit.getUuid().equals("TASK_1");
       }
diff --git a/server/sonar-server/src/test/java/org/sonar/server/computation/TestTaskSubmission.java b/server/sonar-server/src/test/java/org/sonar/server/computation/TestTaskSubmission.java
new file mode 100644 (file)
index 0000000..e4fe390
--- /dev/null
@@ -0,0 +1,71 @@
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+package org.sonar.server.computation;
+
+import javax.annotation.Nullable;
+
+public class TestTaskSubmission implements TaskSubmission {
+  private final String uuid;
+  private String type;
+  private String componentUuid;
+  private String submitterLogin;
+
+  public TestTaskSubmission(String uuid) {
+    this.uuid = uuid;
+  }
+
+  @Override
+  public String getUuid() {
+    return uuid;
+  }
+
+  @Override
+  public String getType() {
+    return type;
+  }
+
+  @Override
+  public TaskSubmission setType(String s) {
+    this.type = s;
+    return this;
+  }
+
+  @Override
+  public String getComponentUuid() {
+    return componentUuid;
+  }
+
+  @Override
+  public TaskSubmission setComponentUuid(@Nullable String s) {
+    this.componentUuid = s;
+    return this;
+  }
+
+  @Override
+  public String getSubmitterLogin() {
+    return submitterLogin;
+  }
+
+  @Override
+  public TaskSubmission setSubmitterLogin(@Nullable String s) {
+    this.submitterLogin = s;
+    return this;
+  }
+}
index 18e338d7ebc5c093105b840fbf8a9158f2e76ff2..bbd2cc07c589ab491540c5b294b9859314e40c52 100644 (file)
@@ -20,7 +20,7 @@
 package org.sonar.server.computation.monitoring;
 
 import org.junit.Test;
-import org.sonar.server.computation.CeQueue;
+import org.sonar.server.computation.CeQueueImpl;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.entry;
@@ -34,7 +34,7 @@ public class ComputeEngineQueueMonitorTest {
   private static final long SUCCESS_COUNT = 13;
   private static final long PROCESSING_TIME = 987;
 
-  private ComputeEngineQueueMonitor underTest = new ComputeEngineQueueMonitor(new DumbCEQueueStatus(), mock(CeQueue.class));
+  private ComputeEngineQueueMonitor underTest = new ComputeEngineQueueMonitor(new DumbCEQueueStatus(), mock(CeQueueImpl.class));
 
   @Test
   public void name_is_ComputeEngineQueue() {
diff --git a/server/sonar-server/src/test/java/org/sonar/server/computation/ws/CeActivityWsActionTest.java b/server/sonar-server/src/test/java/org/sonar/server/computation/ws/CeActivityWsActionTest.java
new file mode 100644 (file)
index 0000000..4835be6
--- /dev/null
@@ -0,0 +1,174 @@
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+package org.sonar.server.computation.ws;
+
+import java.util.Collections;
+import java.util.List;
+import org.junit.Rule;
+import org.junit.Test;
+import org.sonar.api.server.ws.WebService;
+import org.sonar.api.utils.System2;
+import org.sonar.api.web.UserRole;
+import org.sonar.core.util.Protobuf;
+import org.sonar.db.DbTester;
+import org.sonar.db.ce.CeActivityDto;
+import org.sonar.db.ce.CeQueueDto;
+import org.sonar.db.ce.CeTaskTypes;
+import org.sonar.server.plugins.MimeTypes;
+import org.sonar.server.tester.UserSessionRule;
+import org.sonar.server.ws.TestResponse;
+import org.sonar.server.ws.WsActionTester;
+import org.sonarqube.ws.WsCe;
+
+import static java.util.Arrays.asList;
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class CeActivityWsActionTest {
+
+  @Rule
+  public UserSessionRule userSession = UserSessionRule.standalone();
+
+  @Rule
+  public DbTester dbTester = DbTester.create(System2.INSTANCE);
+
+  CeWsTaskFormatter formatter = new CeWsTaskFormatter(dbTester.getDbClient());
+  CeActivityWsAction underTest = new CeActivityWsAction(userSession, dbTester.getDbClient(), formatter);
+  WsActionTester tester = new WsActionTester(underTest);
+
+  @Test
+  public void get_all_past_activity() {
+    userSession.setGlobalPermissions(UserRole.ADMIN);
+    insert("T1", "PROJECT_1", CeActivityDto.Status.SUCCESS);
+    insert("T2", "PROJECT_2", CeActivityDto.Status.FAILED);
+
+    TestResponse wsResponse = tester.newRequest()
+      .setMediaType(MimeTypes.PROTOBUF)
+      .execute();
+
+    // verify the protobuf response
+    WsCe.ActivityResponse activityResponse = Protobuf.read(wsResponse.getInputStream(), WsCe.ActivityResponse.PARSER);
+    assertThat(activityResponse.getTasksCount()).isEqualTo(2);
+
+    // chronological order, from newest to oldest
+    assertThat(activityResponse.getTasks(0).getId()).isEqualTo("T2");
+    assertThat(activityResponse.getTasks(0).getStatus()).isEqualTo(WsCe.TaskStatus.FAILED);
+    assertThat(activityResponse.getTasks(0).getComponentId()).isEqualTo("PROJECT_2");
+    assertThat(activityResponse.getTasks(0).getExecutionTimeMs()).isEqualTo(500L);
+    assertThat(activityResponse.getTasks(1).getId()).isEqualTo("T1");
+    assertThat(activityResponse.getTasks(1).getStatus()).isEqualTo(WsCe.TaskStatus.SUCCESS);
+    assertThat(activityResponse.getTasks(1).getComponentId()).isEqualTo("PROJECT_1");
+  }
+
+  @Test
+   public void filter_by_status() {
+    userSession.setGlobalPermissions(UserRole.ADMIN);
+    insert("T1", "PROJECT_1", CeActivityDto.Status.SUCCESS);
+    insert("T2", "PROJECT_2", CeActivityDto.Status.FAILED);
+
+    TestResponse wsResponse = tester.newRequest()
+      .setParam("status", "FAILED")
+      .setMediaType(MimeTypes.PROTOBUF)
+      .execute();
+
+    WsCe.ActivityResponse activityResponse = Protobuf.read(wsResponse.getInputStream(), WsCe.ActivityResponse.PARSER);
+    assertThat(activityResponse.getTasksCount()).isEqualTo(1);
+    assertThat(activityResponse.getTasks(0).getId()).isEqualTo("T2");
+  }
+
+  @Test
+  public void filter_on_current_activities() {
+    userSession.setGlobalPermissions(UserRole.ADMIN);
+    // T2 is the current activity (the most recent one)
+    insert("T1", "PROJECT_1", CeActivityDto.Status.SUCCESS);
+    insert("T2", "PROJECT_1", CeActivityDto.Status.FAILED);
+
+    TestResponse wsResponse = tester.newRequest()
+      .setParam("onlyCurrents", "true")
+      .setMediaType(MimeTypes.PROTOBUF)
+      .execute();
+
+    WsCe.ActivityResponse activityResponse = Protobuf.read(wsResponse.getInputStream(), WsCe.ActivityResponse.PARSER);
+    assertThat(activityResponse.getTasksCount()).isEqualTo(1);
+    assertThat(activityResponse.getTasks(0).getId()).isEqualTo("T2");
+  }
+
+  @Test
+  public void paginate_results() {
+    userSession.setGlobalPermissions(UserRole.ADMIN);
+    insert("T1", "PROJECT_1", CeActivityDto.Status.SUCCESS);
+    insert("T2", "PROJECT_2", CeActivityDto.Status.FAILED);
+
+    assertPage(1, 1, 2, asList("T2"));
+    assertPage(2, 1, 2, asList("T1"));
+    assertPage(1, 10, 2, asList("T2", "T1"));
+    assertPage(2, 10, 2, Collections.<String>emptyList());
+  }
+
+  private void assertPage(int pageIndex, int pageSize, int expectedTotal, List<String> expectedOrderedTaskIds) {
+    TestResponse wsResponse = tester.newRequest()
+      .setMediaType(MimeTypes.PROTOBUF)
+      .setParam(WebService.Param.PAGE, Integer.toString(pageIndex))
+      .setParam(WebService.Param.PAGE_SIZE, Integer.toString(pageSize))
+      .execute();
+
+    WsCe.ActivityResponse activityResponse = Protobuf.read(wsResponse.getInputStream(), WsCe.ActivityResponse.PARSER);
+    assertThat(activityResponse.getPaging().getPageIndex()).isEqualTo(pageIndex);
+    assertThat(activityResponse.getPaging().getPageSize()).isEqualTo(pageSize);
+    assertThat(activityResponse.getPaging().getTotal()).isEqualTo(expectedTotal);
+
+    assertThat(activityResponse.getTasksCount()).isEqualTo(expectedOrderedTaskIds.size());
+    for (int i = 0; i < expectedOrderedTaskIds.size(); i++) {
+      String expectedTaskId = expectedOrderedTaskIds.get(i);
+      assertThat(activityResponse.getTasks(i).getId()).isEqualTo(expectedTaskId);
+    }
+  }
+
+  @Test
+  public void get_project_activity() {
+    userSession.addProjectUuidPermissions(UserRole.ADMIN, "PROJECT_1");
+    insert("T1", "PROJECT_1", CeActivityDto.Status.SUCCESS);
+    insert("T2", "PROJECT_2", CeActivityDto.Status.FAILED);
+
+    TestResponse wsResponse = tester.newRequest()
+      .setParam("componentId", "PROJECT_1")
+      .setMediaType(MimeTypes.PROTOBUF)
+      .execute();
+
+    // verify the protobuf response
+    WsCe.ActivityResponse activityResponse = Protobuf.read(wsResponse.getInputStream(), WsCe.ActivityResponse.PARSER);
+    assertThat(activityResponse.getTasksCount()).isEqualTo(1);
+    assertThat(activityResponse.getTasks(0).getId()).isEqualTo("T1");
+    assertThat(activityResponse.getTasks(0).getStatus()).isEqualTo(WsCe.TaskStatus.SUCCESS);
+    assertThat(activityResponse.getTasks(0).getComponentId()).isEqualTo("PROJECT_1");
+  }
+
+  private CeActivityDto insert(String taskUuid, String componentUuid, CeActivityDto.Status status) {
+    CeQueueDto queueDto = new CeQueueDto();
+    queueDto.setTaskType(CeTaskTypes.REPORT);
+    queueDto.setComponentUuid(componentUuid);
+    queueDto.setUuid(taskUuid);
+    CeActivityDto activityDto = new CeActivityDto(queueDto);
+    activityDto.setStatus(status);
+    activityDto.setExecutionTimeMs(500L);
+    dbTester.getDbClient().ceActivityDao().insert(dbTester.getSession(), activityDto);
+    dbTester.getSession().commit();
+    return activityDto;
+  }
+}
index 667df7baabb7b13a738f56299e72406299c298af..01ead11b507d13cd5e699bb2403b4a8880dfa1d0 100644 (file)
@@ -25,7 +25,6 @@ import org.mockito.Matchers;
 import org.sonar.core.util.Protobuf;
 import org.sonar.db.ce.CeTaskTypes;
 import org.sonar.server.computation.CeTask;
-import org.sonar.server.computation.ReportProcessingScheduler;
 import org.sonar.server.computation.ReportSubmitter;
 import org.sonar.server.plugins.MimeTypes;
 import org.sonar.server.ws.TestResponse;
@@ -43,8 +42,7 @@ import static org.mockito.Mockito.when;
 public class CeSubmitWsActionTest {
 
   ReportSubmitter reportSubmitter = mock(ReportSubmitter.class);
-  ReportProcessingScheduler reportProcessingScheduler = mock(ReportProcessingScheduler.class);
-  CeSubmitWsAction underTest = new CeSubmitWsAction(reportSubmitter, reportProcessingScheduler);
+  CeSubmitWsAction underTest = new CeSubmitWsAction(reportSubmitter);
   WsActionTester tester = new WsActionTester(underTest);
 
   @Test
@@ -61,16 +59,14 @@ public class CeSubmitWsActionTest {
       .execute();
 
     verify(reportSubmitter).submit(eq("my_project"), Matchers.isNull(String.class), eq("My Project"), any(InputStream.class));
-    verify(reportProcessingScheduler).startAnalysisTaskNow();
 
-    // verify the protobuf response
     WsCe.SubmitResponse submitResponse = Protobuf.read(wsResponse.getInputStream(), WsCe.SubmitResponse.PARSER);
     assertThat(submitResponse.getTaskId()).isEqualTo("TASK_1");
     assertThat(submitResponse.getProjectId()).isEqualTo("PROJECT_1");
   }
 
   @Test
-  public void test_response_example() {
+  public void test_example_json_response() {
     CeTask task = new CeTask("TASK_1", CeTaskTypes.REPORT, "PROJECT_1", "robert");
     when(reportSubmitter.submit(eq("my_project"), Matchers.isNull(String.class), eq("My Project"), any(InputStream.class))).thenReturn(task);
 
index 7dcf0f5074b789e7030e77b9e3805fd255163b6d..6b7574c8ed1602db3dbd89ed81d25862ada06789 100644 (file)
@@ -22,7 +22,6 @@ package org.sonar.server.computation.ws;
 import org.junit.Test;
 import org.mockito.Mockito;
 import org.sonar.api.server.ws.WebService;
-import org.sonar.server.computation.ReportProcessingScheduler;
 import org.sonar.server.computation.ReportSubmitter;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -32,7 +31,7 @@ public class CeWsTest {
 
   @Test
   public void define() throws Exception {
-    CeWsAction wsAction = new CeSubmitWsAction(mock(ReportSubmitter.class), mock(ReportProcessingScheduler.class));
+    CeWsAction wsAction = new CeSubmitWsAction(mock(ReportSubmitter.class));
 
     CeWs ws = new CeWs(wsAction);
     WebService.Context context = mock(WebService.Context.class, Mockito.RETURNS_DEEP_STUBS);
index e2bb4cb8fa5a9a19c888b9a3007274d942888c16..88e5fa87c186318cda395cbfe530decec777f353 100644 (file)
@@ -39,6 +39,7 @@ class CreateCeActivity < ActiveRecord::Migration
       t.column 'execution_time_ms', :big_integer, :null => true
     end
     add_index 'ce_activity', 'uuid', :name => 'ce_activity_uuid', :unique => true
+    add_index 'ce_activity', 'component_uuid', :name => 'ce_activity_component_uuid'
   end
 
 end
index b5460a6e2a1aa7fe912affe4883954783a3c034b..3051ba4006700225040a6b17166bef5454d8d16d 100644 (file)
  */
 package org.sonar.batch.util;
 
+import com.google.common.base.Strings;
 import java.io.UnsupportedEncodingException;
 import java.net.URLEncoder;
 
+import javax.annotation.Nullable;
 import org.apache.commons.lang.StringUtils;
 
 public class BatchUtils {
@@ -38,9 +40,9 @@ public class BatchUtils {
     return StringUtils.replace(cleanKey, ":", "_");
   }
   
-  public static String encodeForUrl(String url) {
+  public static String encodeForUrl(@Nullable String url) {
     try {
-      return URLEncoder.encode(url, "UTF-8");
+      return URLEncoder.encode(Strings.nullToEmpty(url), "UTF-8");
 
     } catch (UnsupportedEncodingException e) {
       throw new IllegalStateException("Encoding not supported", e);
diff --git a/sonar-batch/src/test/java/org/sonar/batch/util/BatchUtilsTest.java b/sonar-batch/src/test/java/org/sonar/batch/util/BatchUtilsTest.java
new file mode 100644 (file)
index 0000000..7926172
--- /dev/null
@@ -0,0 +1,35 @@
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+package org.sonar.batch.util;
+
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class BatchUtilsTest {
+
+  @Test
+  public void encodeForUrl() throws Exception {
+    assertThat(BatchUtils.encodeForUrl(null)).isEqualTo("");
+    assertThat(BatchUtils.encodeForUrl("")).isEqualTo("");
+    assertThat(BatchUtils.encodeForUrl("foo")).isEqualTo("foo");
+    assertThat(BatchUtils.encodeForUrl("foo&bar")).isEqualTo("foo%26bar");
+  }
+}
index fb2f47a900fd07d54b5dcce3560aa540b5cc6ad2..58214c0c0ad9043c19d8ea65acda5d2d351513f7 100644 (file)
@@ -57,12 +57,16 @@ public class CeActivityDao implements Dao {
   }
 
   /**
-   * Ordered by id asc -> oldest to newest
+   * Ordered by id desc -> newest to oldest
    */
   public List<CeActivityDto> selectByQuery(DbSession dbSession, CeActivityQuery query, RowBounds rowBounds) {
     return mapper(dbSession).selectByQuery(query, rowBounds);
   }
 
+  public int countByQuery(DbSession dbSession, CeActivityQuery query) {
+    return mapper(dbSession).countByQuery(query);
+  }
+
   private CeActivityMapper mapper(DbSession dbSession) {
     return dbSession.getMapper(CeActivityMapper.class);
   }
index e39406d7977418f8735ee7db85072894ae7ceab7..7c019b25eb44f164092088743d9b0c931965c271 100644 (file)
@@ -35,6 +35,8 @@ public interface CeActivityMapper {
 
   List<CeActivityDto> selectByQuery(@Param("query") CeActivityQuery query, RowBounds rowBounds);
 
+  int countByQuery(@Param("query") CeActivityQuery query);
+
   void insert(CeActivityDto dto);
 
   void updateIsLastToFalseForLastKey(@Param("isLastKey") String isLastKey, @Param("updatedAt") long updatedAt);
index f0e57fdd0a0f23c431294446e6fc8a0a4c5e1228..98e0269eb6e43783be0056871d4826b64e2e861d 100644 (file)
@@ -19,6 +19,9 @@
  */
 package org.sonar.db.ce;
 
+import javax.annotation.CheckForNull;
+import javax.annotation.Nullable;
+
 public class CeActivityQuery {
 
   private boolean onlyCurrents = false;
@@ -26,11 +29,12 @@ public class CeActivityQuery {
   private CeActivityDto.Status status;
   private String type;
 
+  @CheckForNull
   public String getComponentUuid() {
     return componentUuid;
   }
 
-  public CeActivityQuery setComponentUuid(String componentUuid) {
+  public CeActivityQuery setComponentUuid(@Nullable String componentUuid) {
     this.componentUuid = componentUuid;
     return this;
   }
@@ -44,20 +48,22 @@ public class CeActivityQuery {
     return this;
   }
 
+  @CheckForNull
   public CeActivityDto.Status getStatus() {
     return status;
   }
 
-  public CeActivityQuery setStatus(CeActivityDto.Status status) {
+  public CeActivityQuery setStatus(@Nullable CeActivityDto.Status status) {
     this.status = status;
     return this;
   }
 
+  @CheckForNull
   public String getType() {
     return type;
   }
 
-  public CeActivityQuery setType(String type) {
+  public CeActivityQuery setType(@Nullable String type) {
     this.type = type;
     return this;
   }
index 9add9ad45d540c07d9b4a9f04e2ce626aeb5af1c..eb8f7d8cd27a24c24b357c4c30523c77c7b29208 100644 (file)
     order by ca.id desc
   </select>
 
+  <select id="countByQuery" parameterType="map" resultType="int">
+    select count(ca.id)
+    from ce_activity ca
+    <where>
+      <if test="query.onlyCurrents">
+        ca.is_last=${_true}
+      </if>
+      <if test="query.componentUuid != null">
+        ca.component_uuid=#{query.componentUuid}
+      </if>
+      <if test="query.status != null">
+        ca.status=#{query.status}
+      </if>
+      <if test="query.type != null">
+        ca.task_type=#{query.type}
+      </if>
+    </where>
+  </select>
+
   <insert id="insert" parameterType="org.sonar.db.ce.CeActivityDto" useGeneratedKeys="false">
     insert into ce_activity
     (uuid, component_uuid, status, task_type, is_last, is_last_key, submitter_login, submitted_at, started_at,
index 92e95d0cc2b201437e2b64e8a57242822aca1a45..8dfbf5c09a12810e3cf90b988d409fed79419798 100644 (file)
@@ -708,3 +708,5 @@ CREATE UNIQUE INDEX "PROJECT_QPROFILES_UNIQUE" ON "PROJECT_QPROFILES" ("PROJECT_
 CREATE UNIQUE INDEX "CE_QUEUE_UUID" ON "CE_QUEUE" ("UUID");
 
 CREATE UNIQUE INDEX "CE_ACTIVITY_UUID" ON "CE_ACTIVITY" ("UUID");
+
+CREATE INDEX "CE_ACTIVITY_COMPONENT_UUID" ON "CE_ACTIVITY" ("COMPONENT_UUID");
index c36ca55a0fd87a4b6d07affd766b400a2256d6bb..6031e116e20336f5e6eb0c85fccfbfec72deefc8 100644 (file)
@@ -110,6 +110,32 @@ public class CeActivityDaoTest {
     assertThat(dtos).extracting("uuid").containsExactly("TASK_4");
   }
 
+  @Test
+  public void test_countByQuery() throws Exception {
+    insert("TASK_1", REPORT, "PROJECT_1", CeActivityDto.Status.SUCCESS);
+    insert("TASK_2", REPORT, "PROJECT_1", CeActivityDto.Status.FAILED);
+    insert("TASK_3", REPORT, "PROJECT_2", CeActivityDto.Status.SUCCESS);
+    insert("TASK_4", "views", null, CeActivityDto.Status.SUCCESS);
+
+    // no filters
+    CeActivityQuery query = new CeActivityQuery();
+    assertThat(underTest.countByQuery(db.getSession(), query)).isEqualTo(4);
+
+    // select by component uuid
+    query = new CeActivityQuery().setComponentUuid("PROJECT_1");
+    assertThat(underTest.countByQuery(db.getSession(), query)).isEqualTo(2);
+
+    // select by status
+    query = new CeActivityQuery().setStatus(CeActivityDto.Status.SUCCESS);
+    assertThat(underTest.countByQuery(db.getSession(), query)).isEqualTo(3);
+
+    // select by type
+    query = new CeActivityQuery().setType(REPORT);
+    assertThat(underTest.countByQuery(db.getSession(), query)).isEqualTo(3);
+    query = new CeActivityQuery().setType("views");
+    assertThat(underTest.countByQuery(db.getSession(), query)).isEqualTo(1);
+  }
+
   @Test
   public void deleteOlderThan() throws Exception {
     insertWithDate("TASK_1", 1_450_000_000_000L);