package org.sonar.server.computation;
import com.google.common.base.Optional;
-import org.sonar.api.server.ServerSide;
-import org.sonar.api.utils.System2;
-import org.sonar.core.util.UuidFactory;
-import org.sonar.db.DbClient;
-import org.sonar.db.DbSession;
import org.sonar.db.ce.CeActivityDto;
-import org.sonar.db.ce.CeQueueDto;
-import org.sonar.server.computation.monitoring.CEQueueStatus;
-
-import static java.lang.String.format;
/**
* Queue of pending Compute Engine tasks. Both producer and consumer actions
* This class is decoupled from the regular task type {@link org.sonar.db.ce.CeTaskTypes#REPORT}.
* </p>
*/
-@ServerSide
-public class CeQueue {
-
- private final System2 system2;
- private final DbClient dbClient;
- private final UuidFactory uuidFactory;
- private final CEQueueStatus queueStatus;
- private final CeQueueListener[] listeners;
-
- // state
- private boolean submitPaused = false;
- private boolean peekPaused = false;
-
- public CeQueue(System2 system2, DbClient dbClient, UuidFactory uuidFactory,
- CEQueueStatus queueStatus, CeQueueListener[] listeners) {
- this.system2 = system2;
- this.dbClient = dbClient;
- this.uuidFactory = uuidFactory;
- this.queueStatus = queueStatus;
- this.listeners = listeners;
- }
-
- public CeTaskSubmit prepareSubmit() {
- return new CeTaskSubmit(uuidFactory.create());
- }
-
- public CeTask submit(CeTaskSubmit submit) {
- if (submitPaused) {
- throw new IllegalStateException("Compute Engine does not currently accept new tasks");
- }
- CeTask task = new CeTask(submit);
- DbSession dbSession = dbClient.openSession(false);
- try {
- CeQueueDto dto = new CeQueueDto();
- dto.setUuid(task.getUuid());
- dto.setTaskType(task.getType());
- dto.setComponentUuid(task.getComponentUuid());
- dto.setStatus(CeQueueDto.Status.PENDING);
- dto.setSubmitterLogin(task.getSubmitterLogin());
- dto.setStartedAt(null);
- dbClient.ceQueueDao().insert(dbSession, dto);
- dbSession.commit();
- queueStatus.addReceived();
- return task;
- } finally {
- dbClient.closeSession(dbSession);
- }
- }
-
- public Optional<CeTask> peek() {
- if (peekPaused) {
- return Optional.absent();
- }
- DbSession dbSession = dbClient.openSession(false);
- try {
- Optional<CeQueueDto> dto = dbClient.ceQueueDao().peek(dbSession);
- if (!dto.isPresent()) {
- return Optional.absent();
- }
- queueStatus.addInProgress();
- return Optional.of(new CeTask(dto.get()));
-
- } finally {
- dbClient.closeSession(dbSession);
- }
- }
+public interface CeQueue {
+ /**
+ * Build an instance of {@link TaskSubmission} required for {@link #submit(TaskSubmission)}. It allows
+ * to enforce that task ids are generated by the queue. It's used also for having access
+ * to the id before submitting the task to the queue.
+ */
+ TaskSubmission prepareSubmit();
- public boolean cancel(String taskUuid) {
- DbSession dbSession = dbClient.openSession(false);
- try {
- Optional<CeQueueDto> queueDto = dbClient.ceQueueDao().selectByUuid(dbSession, taskUuid);
- if (queueDto.isPresent()) {
- if (!queueDto.get().getStatus().equals(CeQueueDto.Status.PENDING)) {
- throw new IllegalStateException(String.format("Task is in progress and can't be cancelled [uuid=%s]", taskUuid));
- }
- cancel(dbSession, queueDto.get());
- return true;
- }
- return false;
- } finally {
- dbClient.closeSession(dbSession);
- }
- }
+ /**
+ * Submits a task to the queue. The task is processed asynchronously.
+ * If submits are paused (see {@link #isSubmitPaused()}, then an
+ * unchecked exception is thrown.
+ */
+ CeTask submit(TaskSubmission submission);
- void cancel(DbSession dbSession, CeQueueDto q) {
- CeActivityDto activityDto = new CeActivityDto(q);
- activityDto.setStatus(CeActivityDto.Status.CANCELED);
- remove(dbSession, new CeTask(q), q, activityDto);
- }
+ /**
+ * Peek the oldest task in status {@link org.sonar.db.ce.CeQueueDto.Status#PENDING}.
+ * The task status is changed to {@link org.sonar.db.ce.CeQueueDto.Status#IN_PROGRESS}.
+ * Does not return anything if the queue is paused (see {@link #isPeekPaused()}.
+ *
+ * <p>Only a single task can be peeked by project.</p>
+ *
+ * <p>An unchecked exception may be thrown on technical errors (db connection, ...).</p>
+ */
+ Optional<CeTask> peek();
+ /**
+ * Cancels a task in status {@link org.sonar.db.ce.CeQueueDto.Status#PENDING}. An unchecked
+ * exception is thrown if the status is not {@link org.sonar.db.ce.CeQueueDto.Status#PENDING}.
+ * The method does nothing and returns {@code false} if the task does not exist.
+ *
+ * @return true if the task exists and is successfully canceled.
+ */
+ boolean cancel(String taskUuid);
/**
* Removes all the tasks from the queue, whatever their status. They are marked
*
* @return the number of canceled tasks
*/
- public int clear() {
- return cancelAll(true);
- }
+ int clear();
/**
* Similar as {@link #clear()}, except that the tasks with status
*
* @return the number of canceled tasks
*/
- public int cancelAll() {
- return cancelAll(false);
- }
+ int cancelAll();
- private int cancelAll(boolean includeInProgress) {
- int count = 0;
- DbSession dbSession = dbClient.openSession(false);
- try {
- for (CeQueueDto queueDto : dbClient.ceQueueDao().selectAllInAscOrder(dbSession)) {
- if (includeInProgress || !queueDto.getStatus().equals(CeQueueDto.Status.IN_PROGRESS)) {
- cancel(dbSession, queueDto);
- count++;
- }
- }
- return count;
- } finally {
- dbClient.closeSession(dbSession);
- }
- }
-
- public void remove(CeTask task, CeActivityDto.Status status) {
- DbSession dbSession = dbClient.openSession(false);
- try {
- Optional<CeQueueDto> queueDto = dbClient.ceQueueDao().selectByUuid(dbSession, task.getUuid());
- if (!queueDto.isPresent()) {
- throw new IllegalStateException(format("Task does not exist anymore: %s", task));
- }
- CeActivityDto activityDto = new CeActivityDto(queueDto.get());
- activityDto.setStatus(status);
- Long startedAt = activityDto.getStartedAt();
- if (startedAt != null) {
- activityDto.setFinishedAt(system2.now());
- long executionTime = activityDto.getFinishedAt() - startedAt;
- activityDto.setExecutionTimeMs(executionTime);
- if (status == CeActivityDto.Status.SUCCESS) {
- queueStatus.addSuccess(executionTime);
- } else {
- queueStatus.addError(executionTime);
- }
- }
- remove(dbSession, task, queueDto.get(), activityDto);
-
- } finally {
- dbClient.closeSession(dbSession);
- }
- }
-
- private void remove(DbSession dbSession, CeTask task, CeQueueDto queueDto, CeActivityDto activityDto) {
- dbClient.ceActivityDao().insert(dbSession, activityDto);
- dbClient.ceQueueDao().deleteByUuid(dbSession, queueDto.getUuid());
- dbSession.commit();
- for (CeQueueListener listener : listeners) {
- listener.onRemoved(task, activityDto.getStatus());
- }
- }
+ /**
+ * Removes a task from the queue and registers it to past activities. This method
+ * is called by Compute Engine workers when task is processed.
+ *
+ * <p>An unchecked exception is thrown if the task does not exist in the queue.</p>
+ */
+ void remove(CeTask task, CeActivityDto.Status status);
- public void pauseSubmit() {
- this.submitPaused = true;
- }
+ void pauseSubmit();
- public void resumeSubmit() {
- this.submitPaused = false;
- }
+ void resumeSubmit();
- public boolean isSubmitPaused() {
- return submitPaused;
- }
+ boolean isSubmitPaused();
- public void pausePeek() {
- this.peekPaused = true;
- }
+ void pausePeek();
- public void resumePeek() {
- this.peekPaused = false;
- }
+ void resumePeek();
- public boolean isPeekPaused() {
- return peekPaused;
- }
+ boolean isPeekPaused();
}
--- /dev/null
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.server.computation;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.sonar.api.platform.ServerUpgradeStatus;
+import org.sonar.api.server.ServerSide;
+import org.sonar.api.utils.log.Logger;
+import org.sonar.api.utils.log.Loggers;
+import org.sonar.db.DbClient;
+import org.sonar.db.DbSession;
+import org.sonar.db.ce.CeQueueDto;
+import org.sonar.db.ce.CeTaskTypes;
+
+/**
+ * Cleans-up the Compute Engine queue and resets the JMX counters.
+ * CE workers must not be started before execution of this class.
+ */
+@ServerSide
+public class CeQueueCleaner {
+
+ private static final Logger LOGGER = Loggers.get(CeQueueCleaner.class);
+
+ private final DbClient dbClient;
+ private final ServerUpgradeStatus serverUpgradeStatus;
+ private final ReportFiles reportFiles;
+ private final CeQueueImpl queue;
+
+ public CeQueueCleaner(DbClient dbClient, ServerUpgradeStatus serverUpgradeStatus, ReportFiles reportFiles, CeQueueImpl queue) {
+ this.dbClient = dbClient;
+ this.serverUpgradeStatus = serverUpgradeStatus;
+ this.reportFiles = reportFiles;
+ this.queue = queue;
+ }
+
+ public void clean(DbSession dbSession) {
+ if (serverUpgradeStatus.isUpgraded()) {
+ cleanOnUpgrade();
+ } else {
+ verifyConsistency(dbSession);
+ }
+ }
+
+ private void cleanOnUpgrade() {
+ // we assume that pending tasks are not compatible with the new version
+ // and can't be processed
+ LOGGER.info("Cancel all pending tasks (due to upgrade)");
+ queue.clear();
+ }
+
+ private void verifyConsistency(DbSession dbSession) {
+ // server is not being upgraded
+ dbClient.ceQueueDao().resetAllToPendingStatus(dbSession);
+ dbSession.commit();
+
+ // verify that the report files are available for the tasks in queue
+ Set<String> uuidsInQueue = new HashSet<>();
+ for (CeQueueDto queueDto : dbClient.ceQueueDao().selectAllInAscOrder(dbSession)) {
+ uuidsInQueue.add(queueDto.getUuid());
+ if (CeTaskTypes.REPORT.equals(queueDto.getTaskType()) && !reportFiles.fileForUuid(queueDto.getUuid()).exists()) {
+ // the report is not available on file system
+ queue.cancel(dbSession, queueDto);
+ }
+ }
+
+ // clean-up filesystem
+ for (String uuid : reportFiles.listUuids()) {
+ if (!uuidsInQueue.contains(uuid)) {
+ reportFiles.deleteIfExists(uuid);
+ }
+ }
+ }
+}
--- /dev/null
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.server.computation;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Strings;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.annotation.Nullable;
+import org.sonar.api.server.ServerSide;
+import org.sonar.api.utils.System2;
+import org.sonar.core.util.UuidFactory;
+import org.sonar.db.DbClient;
+import org.sonar.db.DbSession;
+import org.sonar.db.ce.CeActivityDto;
+import org.sonar.db.ce.CeQueueDto;
+import org.sonar.server.computation.monitoring.CEQueueStatus;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static java.lang.String.format;
+
+@ServerSide
+public class CeQueueImpl implements CeQueue {
+
+ private final System2 system2;
+ private final DbClient dbClient;
+ private final UuidFactory uuidFactory;
+ private final CEQueueStatus queueStatus;
+ private final CeQueueListener[] listeners;
+
+ // state
+ private AtomicBoolean submitPaused = new AtomicBoolean(false);
+ private AtomicBoolean peekPaused = new AtomicBoolean(false);
+
+ public CeQueueImpl(System2 system2, DbClient dbClient, UuidFactory uuidFactory,
+ CEQueueStatus queueStatus, CeQueueListener[] listeners) {
+ this.system2 = system2;
+ this.dbClient = dbClient;
+ this.uuidFactory = uuidFactory;
+ this.queueStatus = queueStatus;
+ this.listeners = listeners;
+ }
+
+ @Override
+ public TaskSubmission prepareSubmit() {
+ return new TaskSubmissionImpl(uuidFactory.create());
+ }
+
+ @Override
+ public CeTask submit(TaskSubmission submission) {
+ checkArgument(!Strings.isNullOrEmpty(submission.getUuid()));
+ checkArgument(!Strings.isNullOrEmpty(submission.getType()));
+ checkArgument(submission instanceof TaskSubmissionImpl);
+ checkState(!submitPaused.get(), "Compute Engine does not currently accept new tasks");
+
+ CeTask task = new CeTask(submission);
+ DbSession dbSession = dbClient.openSession(false);
+ try {
+ CeQueueDto dto = createQueueDtoForSubmit(task);
+ dbClient.ceQueueDao().insert(dbSession, dto);
+ dbSession.commit();
+ queueStatus.addReceived();
+ return task;
+ } finally {
+ dbClient.closeSession(dbSession);
+ }
+ }
+
+ private CeQueueDto createQueueDtoForSubmit(CeTask task) {
+ CeQueueDto dto = new CeQueueDto();
+ dto.setUuid(task.getUuid());
+ dto.setTaskType(task.getType());
+ dto.setComponentUuid(task.getComponentUuid());
+ dto.setStatus(CeQueueDto.Status.PENDING);
+ dto.setSubmitterLogin(task.getSubmitterLogin());
+ dto.setStartedAt(null);
+ return dto;
+ }
+
+ @Override
+ public Optional<CeTask> peek() {
+ if (peekPaused.get()) {
+ return Optional.absent();
+ }
+ DbSession dbSession = dbClient.openSession(false);
+ try {
+ Optional<CeQueueDto> dto = dbClient.ceQueueDao().peek(dbSession);
+ CeTask task = null;
+ if (dto.isPresent()) {
+ queueStatus.addInProgress();
+ task = new CeTask(dto.get());
+ }
+ return Optional.fromNullable(task);
+
+ } finally {
+ dbClient.closeSession(dbSession);
+ }
+ }
+
+ @Override
+ public boolean cancel(String taskUuid) {
+ DbSession dbSession = dbClient.openSession(false);
+ try {
+ Optional<CeQueueDto> queueDto = dbClient.ceQueueDao().selectByUuid(dbSession, taskUuid);
+ if (queueDto.isPresent()) {
+ checkState(CeQueueDto.Status.PENDING.equals(queueDto.get().getStatus()), "Task is in progress and can't be canceled [uuid=%s]", taskUuid);
+ cancel(dbSession, queueDto.get());
+ return true;
+ }
+ return false;
+ } finally {
+ dbClient.closeSession(dbSession);
+ }
+ }
+
+ void cancel(DbSession dbSession, CeQueueDto q) {
+ CeActivityDto activityDto = new CeActivityDto(q);
+ activityDto.setStatus(CeActivityDto.Status.CANCELED);
+ remove(dbSession, new CeTask(q), q, activityDto);
+ }
+
+ @Override
+ public int clear() {
+ return cancelAll(true);
+ }
+
+ @Override
+ public int cancelAll() {
+ return cancelAll(false);
+ }
+
+ private int cancelAll(boolean includeInProgress) {
+ int count = 0;
+ DbSession dbSession = dbClient.openSession(false);
+ try {
+ for (CeQueueDto queueDto : dbClient.ceQueueDao().selectAllInAscOrder(dbSession)) {
+ if (includeInProgress || !queueDto.getStatus().equals(CeQueueDto.Status.IN_PROGRESS)) {
+ cancel(dbSession, queueDto);
+ count++;
+ }
+ }
+ return count;
+ } finally {
+ dbClient.closeSession(dbSession);
+ }
+ }
+
+ @Override
+ public void remove(CeTask task, CeActivityDto.Status status) {
+ DbSession dbSession = dbClient.openSession(false);
+ try {
+ Optional<CeQueueDto> queueDto = dbClient.ceQueueDao().selectByUuid(dbSession, task.getUuid());
+ if (!queueDto.isPresent()) {
+ throw new IllegalStateException(format("Task does not exist anymore: %s", task));
+ }
+ CeActivityDto activityDto = new CeActivityDto(queueDto.get());
+ activityDto.setStatus(status);
+ updateQueueStatus(status, activityDto);
+ remove(dbSession, task, queueDto.get(), activityDto);
+
+ } finally {
+ dbClient.closeSession(dbSession);
+ }
+ }
+
+ private void updateQueueStatus(CeActivityDto.Status status, CeActivityDto activityDto) {
+ Long startedAt = activityDto.getStartedAt();
+ if (startedAt == null) {
+ return;
+ }
+ activityDto.setFinishedAt(system2.now());
+ long executionTime = activityDto.getFinishedAt() - startedAt;
+ activityDto.setExecutionTimeMs(executionTime);
+ if (status == CeActivityDto.Status.SUCCESS) {
+ queueStatus.addSuccess(executionTime);
+ } else {
+ queueStatus.addError(executionTime);
+ }
+ }
+
+ private void remove(DbSession dbSession, CeTask task, CeQueueDto queueDto, CeActivityDto activityDto) {
+ dbClient.ceActivityDao().insert(dbSession, activityDto);
+ dbClient.ceQueueDao().deleteByUuid(dbSession, queueDto.getUuid());
+ dbSession.commit();
+ for (CeQueueListener listener : listeners) {
+ listener.onRemoved(task, activityDto.getStatus());
+ }
+ }
+
+ @Override
+ public void pauseSubmit() {
+ this.submitPaused.set(true);
+ }
+
+ @Override
+ public void resumeSubmit() {
+ this.submitPaused.set(false);
+ }
+
+ @Override
+ public boolean isSubmitPaused() {
+ return submitPaused.get();
+ }
+
+ @Override
+ public void pausePeek() {
+ this.peekPaused.set(true);
+ }
+
+ @Override
+ public void resumePeek() {
+ this.peekPaused.set(false);
+ }
+
+ @Override
+ public boolean isPeekPaused() {
+ return peekPaused.get();
+ }
+
+ private static class TaskSubmissionImpl implements TaskSubmission {
+ private final String uuid;
+ private String type;
+ private String componentUuid;
+ private String submitterLogin;
+
+ private TaskSubmissionImpl(String uuid) {
+ this.uuid = uuid;
+ }
+
+ @Override
+ public String getUuid() {
+ return uuid;
+ }
+
+ @Override
+ public String getType() {
+ return type;
+ }
+
+ @Override
+ public TaskSubmission setType(@Nullable String s) {
+ this.type = s;
+ return this;
+ }
+
+ @Override
+ public String getComponentUuid() {
+ return componentUuid;
+ }
+
+ @Override
+ public TaskSubmission setComponentUuid(@Nullable String s) {
+ this.componentUuid = s;
+ return this;
+ }
+
+ @Override
+ public String getSubmitterLogin() {
+ return submitterLogin;
+ }
+
+ @Override
+ public TaskSubmission setSubmitterLogin(@Nullable String s) {
+ this.submitterLogin = s;
+ return this;
+ }
+ }
+}
*/
package org.sonar.server.computation;
-import java.util.HashSet;
-import java.util.Set;
-import org.sonar.api.platform.ServerUpgradeStatus;
+import org.picocontainer.Startable;
import org.sonar.api.server.ServerSide;
-import org.sonar.api.utils.log.Logger;
-import org.sonar.api.utils.log.Loggers;
import org.sonar.db.DbClient;
import org.sonar.db.DbSession;
-import org.sonar.db.ce.CeQueueDto;
-import org.sonar.db.ce.CeTaskTypes;
import org.sonar.server.computation.monitoring.CEQueueStatus;
/**
- * Cleans-up the Compute Engine queue and resets the JMX counters.
- * CE workers must not be started before execution of this class.
+ * Cleans-up the queue, initializes JMX counters then schedule
+ * the execution of workers. That allows to not prevent workers
+ * from peeking the queue before it's ready.
*/
@ServerSide
-public class CeQueueInitializer {
-
- private static final Logger LOGGER = Loggers.get(CeQueueInitializer.class);
+public class CeQueueInitializer implements Startable {
private final DbClient dbClient;
- private final ServerUpgradeStatus serverUpgradeStatus;
- private final ReportFiles reportFiles;
- private final CeQueue queue;
private final CEQueueStatus queueStatus;
+ private final CeQueueCleaner cleaner;
+ private final ReportProcessingScheduler scheduler;
- public CeQueueInitializer(DbClient dbClient, ServerUpgradeStatus serverUpgradeStatus, ReportFiles reportFiles,
- CeQueue queue, CEQueueStatus queueStatus) {
+ public CeQueueInitializer(DbClient dbClient, CEQueueStatus queueStatus, CeQueueCleaner cleaner, ReportProcessingScheduler scheduler) {
this.dbClient = dbClient;
- this.serverUpgradeStatus = serverUpgradeStatus;
- this.reportFiles = reportFiles;
- this.queue = queue;
this.queueStatus = queueStatus;
+ this.cleaner = cleaner;
+ this.scheduler = scheduler;
}
- /**
- * Do not rename. Used at server startup.
- */
+ @Override
public void start() {
DbSession dbSession = dbClient.openSession(false);
try {
initJmxCounters(dbSession);
-
- if (serverUpgradeStatus.isUpgraded()) {
- cleanOnUpgrade();
- } else {
- verifyConsistency(dbSession);
- }
+ cleaner.clean(dbSession);
+ scheduler.schedule();
} finally {
dbClient.closeSession(dbSession);
}
}
- private void cleanOnUpgrade() {
- // we assume that pending tasks are not compatible with the new version
- // and can't be processed
- LOGGER.info("Cancel all pending tasks (due to upgrade)");
- queue.clear();
- }
-
- private void verifyConsistency(DbSession dbSession) {
- // server is not being upgraded
- dbClient.ceQueueDao().resetAllToPendingStatus(dbSession);
- dbSession.commit();
-
- // verify that the report files are available for the tasks in queue
- Set<String> uuidsInQueue = new HashSet<>();
- for (CeQueueDto queueDto : dbClient.ceQueueDao().selectAllInAscOrder(dbSession)) {
- uuidsInQueue.add(queueDto.getUuid());
- if (CeTaskTypes.REPORT.equals(queueDto.getTaskType()) && !reportFiles.fileForUuid(queueDto.getUuid()).exists()) {
- // the report is not available on file system
- queue.cancel(dbSession, queueDto);
- }
- }
-
- // clean-up filesystem
- for (String uuid : reportFiles.listUuids()) {
- if (!uuidsInQueue.contains(uuid)) {
- reportFiles.deleteIfExists(uuid);
- }
- }
+ @Override
+ public void stop() {
+ // nothing to do
}
private void initJmxCounters(DbSession dbSession) {
import javax.annotation.concurrent.Immutable;
import org.sonar.db.ce.CeQueueDto;
+import static java.util.Objects.requireNonNull;
+
@Immutable
public class CeTask {
private final String submitterLogin;
public CeTask(String uuid, String type, @Nullable String componentUuid, @Nullable String submitterLogin) {
- this.uuid = uuid;
- this.type = type;
+ this.uuid = requireNonNull(uuid);
+ this.type = requireNonNull(type);
this.componentUuid = componentUuid;
this.submitterLogin = submitterLogin;
}
- CeTask(CeTaskSubmit submit) {
- this.uuid = submit.getUuid();
- this.type = submit.getType();
- this.componentUuid = submit.getComponentUuid();
- this.submitterLogin = submit.getSubmitterLogin();
+ CeTask(TaskSubmission submit) {
+ this(submit.getUuid(), submit.getType(), submit.getComponentUuid(), submit.getSubmitterLogin());
}
CeTask(CeQueueDto dto) {
- this.uuid = dto.getUuid();
- this.type = dto.getTaskType();
- this.componentUuid = dto.getComponentUuid();
- this.submitterLogin = dto.getSubmitterLogin();
+ this(dto.getUuid(), dto.getTaskType(), dto.getComponentUuid(), dto.getSubmitterLogin());
}
public String getUuid() {
}
@Override
- public boolean equals(Object o) {
+ public boolean equals(@Nullable Object o) {
if (this == o) {
return true;
}
+++ /dev/null
-/*
- * SonarQube, open source software quality management tool.
- * Copyright (C) 2008-2014 SonarSource
- * mailto:contact AT sonarsource DOT com
- *
- * SonarQube is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * SonarQube is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-package org.sonar.server.computation;
-
-public class CeTaskSubmit {
-
- private final String uuid;
- private String type;
- private String componentUuid;
- private String submitterLogin;
-
- CeTaskSubmit(String uuid) {
- this.uuid = uuid;
- }
-
- public String getUuid() {
- return uuid;
- }
-
- public String getType() {
- return type;
- }
-
- public void setType(String type) {
- this.type = type;
- }
-
- public String getComponentUuid() {
- return componentUuid;
- }
-
- public void setComponentUuid(String s) {
- this.componentUuid = s;
- }
-
- public String getSubmitterLogin() {
- return submitterLogin;
- }
-
- public void setSubmitterLogin(String s) {
- this.submitterLogin = s;
- }
-}
this.settings = settings;
}
- public void save(CeTaskSubmit taskSubmit, InputStream reportInput) {
+ public void save(TaskSubmission taskSubmit, InputStream reportInput) {
File file = fileForUuid(taskSubmit.getUuid());
try {
FileUtils.copyInputStreamToFile(reportInput, file);
package org.sonar.server.computation;
import java.util.concurrent.TimeUnit;
-import org.sonar.api.platform.Server;
-import org.sonar.api.platform.ServerStartHandler;
/**
* Adds tasks to the Compute Engine to process batch reports.
*/
-public class ReportProcessingScheduler implements ServerStartHandler {
+public class ReportProcessingScheduler {
private final ReportProcessingSchedulerExecutorService reportProcessingSchedulerExecutorService;
private final ComputeEngineProcessingQueue processingQueue;
private final CeWorker worker;
this.processingQueue = processingQueue;
this.worker = worker;
- this.delayBetweenTasks = 10;
+ this.delayBetweenTasks = 1;
this.delayForFirstStart = 0;
this.timeUnit = TimeUnit.SECONDS;
}
- public void startAnalysisTaskNow() {
- reportProcessingSchedulerExecutorService.execute(new AddReportProcessingToCEProcessingQueue());
- }
-
- @Override
- public void onServerStart(Server server) {
+ public void schedule() {
reportProcessingSchedulerExecutorService.scheduleAtFixedRate(new AddReportProcessingToCEProcessingQueue(), delayForFirstStart, delayBetweenTasks, timeUnit);
}
import org.sonar.server.util.StoppableScheduledExecutorService;
/**
- * ExecutorService responsible for adding {@link CeWorkerImpl} to {@link CeQueue} on a regular basis.
+ * ExecutorService responsible for adding {@link CeWorkerImpl} to {@link CeQueueImpl} on a regular basis.
*/
public interface ReportProcessingSchedulerExecutorService extends StoppableScheduledExecutorService {
}
}
// the report file must be saved before submitting the task
- CeTaskSubmit submit = queue.prepareSubmit();
+ TaskSubmission submit = queue.prepareSubmit();
reportFiles.save(submit, reportInput);
submit.setType(CeTaskTypes.REPORT);
--- /dev/null
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.server.computation;
+
+import javax.annotation.CheckForNull;
+import javax.annotation.Nullable;
+
+public interface TaskSubmission {
+
+ String getUuid();
+
+ String getType();
+
+ TaskSubmission setType(String s);
+
+ @CheckForNull
+ String getComponentUuid();
+
+ TaskSubmission setComponentUuid(@Nullable String s);
+
+ @CheckForNull
+ String getSubmitterLogin();
+
+ TaskSubmission setSubmitterLogin(@Nullable String s);
+
+}
import org.sonar.api.server.ws.Response;
import org.sonar.api.server.ws.WebService;
import org.sonar.api.web.UserRole;
+import org.sonar.core.util.Uuids;
import org.sonar.db.DbClient;
import org.sonar.db.DbSession;
import org.sonar.db.ce.CeActivityDto;
import org.sonar.db.ce.CeActivityQuery;
+import org.sonar.db.ce.CeTaskTypes;
import org.sonar.server.user.UserSession;
import org.sonar.server.ws.WsUtils;
import org.sonarqube.ws.Common;
*/
public class CeActivityWsAction implements CeWsAction {
+ private static final String PARAM_COMPONENT_ID = "componentId";
+ private static final String PARAM_TYPE = "type";
+ private static final String PARAM_STATUS = "status";
+ private static final String PARAM_ONLY_CURRENTS = "onlyCurrents";
+
private final UserSession userSession;
private final DbClient dbClient;
private final CeWsTaskFormatter formatter;
@Override
public void define(WebService.NewController controller) {
- controller.createAction("activity")
+ WebService.NewAction action = controller.createAction("activity")
.setInternal(true)
.setResponseExample(getClass().getResource("CeActivityWsAction/example.json"))
.setHandler(this);
+ action.createParam(PARAM_COMPONENT_ID)
+ .setDescription("Optional id of the component (project) to filter on")
+ .setExampleValue(Uuids.UUID_EXAMPLE_03);
+ action.createParam(PARAM_STATUS)
+ .setDescription("Optional filter on task status")
+ .setPossibleValues(CeActivityDto.Status.values());
+ action.createParam(PARAM_ONLY_CURRENTS)
+ .setDescription("Optional filter on the current activities (only the most recent task by project)")
+ .setBooleanPossibleValues()
+ .setDefaultValue("false");
+ action.createParam(PARAM_TYPE)
+ .setDescription("Optional filter on task type")
+ .setExampleValue(CeTaskTypes.REPORT);
+ action.addPagingParams(10);
}
@Override
public void handle(Request wsRequest, Response wsResponse) throws Exception {
- userSession.checkGlobalPermission(UserRole.ADMIN);
DbSession dbSession = dbClient.openSession(false);
try {
- CeActivityQuery query = new CeActivityQuery();
- List<CeActivityDto> dtos = dbClient.ceActivityDao().selectByQuery(dbSession, query, new RowBounds(0, 100));
+ CeActivityQuery query = readQuery(wsRequest);
+ RowBounds rowBounds = readMyBatisRowBounds(wsRequest);
+ List<CeActivityDto> dtos = dbClient.ceActivityDao().selectByQuery(dbSession, query, rowBounds);
+ int total = dbClient.ceActivityDao().countByQuery(dbSession, query);
WsCe.ActivityResponse.Builder wsResponseBuilder = WsCe.ActivityResponse.newBuilder();
wsResponseBuilder.addAllTasks(formatter.formatActivity(dbSession, dtos));
- Common.Paging paging = Common.Paging.newBuilder()
- .setPageIndex(1)
- .setPageSize(dtos.size())
- .setTotal(dtos.size())
- .build();
- wsResponseBuilder.setPaging(paging);
+ wsResponseBuilder.setPaging(Common.Paging.newBuilder()
+ .setPageIndex(wsRequest.mandatoryParamAsInt(WebService.Param.PAGE))
+ .setPageSize(wsRequest.mandatoryParamAsInt(WebService.Param.PAGE_SIZE))
+ .setTotal(total));
WsUtils.writeProtobuf(wsResponseBuilder.build(), wsRequest, wsResponse);
} finally {
dbClient.closeSession(dbSession);
}
}
+
+ private CeActivityQuery readQuery(Request wsRequest) {
+ CeActivityQuery query = new CeActivityQuery();
+ query.setType(wsRequest.param(PARAM_TYPE));
+ query.setOnlyCurrents(wsRequest.mandatoryParamAsBoolean(PARAM_ONLY_CURRENTS));
+
+ String status = wsRequest.param(PARAM_STATUS);
+ if (status != null) {
+ query.setStatus(CeActivityDto.Status.valueOf(status));
+ }
+
+ String componentId = wsRequest.param(PARAM_COMPONENT_ID);
+ if (componentId == null) {
+ userSession.checkGlobalPermission(UserRole.ADMIN);
+ } else {
+ userSession.checkProjectUuidPermission(UserRole.ADMIN, componentId);
+ query.setComponentUuid(componentId);
+ }
+ return query;
+ }
+
+ private static RowBounds readMyBatisRowBounds(Request wsRequest) {
+ int pageIndex = wsRequest.mandatoryParamAsInt(WebService.Param.PAGE);
+ int pageSize = wsRequest.mandatoryParamAsInt(WebService.Param.PAGE_SIZE);
+ return new RowBounds((pageIndex - 1) * pageSize, pageSize);
+ }
}
*/
package org.sonar.server.computation.ws;
-import com.google.common.base.Strings;
import java.io.InputStream;
import org.apache.commons.lang.StringUtils;
import org.sonar.api.server.ws.Request;
import org.sonar.api.server.ws.Response;
import org.sonar.api.server.ws.WebService;
import org.sonar.server.computation.CeTask;
-import org.sonar.server.computation.ReportProcessingScheduler;
import org.sonar.server.computation.ReportSubmitter;
import org.sonar.server.ws.WsUtils;
import org.sonarqube.ws.WsCe;
public static final String PARAM_REPORT_DATA = "report";
private final ReportSubmitter reportSubmitter;
- private final ReportProcessingScheduler reportProcessingScheduler;
- public CeSubmitWsAction(ReportSubmitter reportSubmitter, ReportProcessingScheduler reportProcessingScheduler) {
+ public CeSubmitWsAction(ReportSubmitter reportSubmitter) {
this.reportSubmitter = reportSubmitter;
- this.reportProcessingScheduler = reportProcessingScheduler;
}
@Override
@Override
public void handle(Request wsRequest, Response wsResponse) throws Exception {
String projectKey = wsRequest.mandatoryParam(PARAM_PROJECT_KEY);
- String projectBranch = Strings.emptyToNull(wsRequest.param(PARAM_PROJECT_BRANCH));
+ String projectBranch = wsRequest.param(PARAM_PROJECT_BRANCH);
String projectName = StringUtils.defaultIfBlank(wsRequest.param(PARAM_PROJECT_NAME), projectKey);
InputStream reportInput = wsRequest.paramAsInputStream(PARAM_REPORT_DATA);
CeTask task = reportSubmitter.submit(projectKey, projectBranch, projectName, reportInput);
- reportProcessingScheduler.startAnalysisTaskNow();
WsCe.SubmitResponse submitResponse = WsCe.SubmitResponse.newBuilder()
.setTaskId(task.getUuid())
import org.sonar.server.component.ws.ComponentsWs;
import org.sonar.server.component.ws.EventsWs;
import org.sonar.server.component.ws.ResourcesWs;
-import org.sonar.server.computation.CeQueue;
+import org.sonar.server.computation.CeQueueImpl;
+import org.sonar.server.computation.CeQueueCleaner;
import org.sonar.server.computation.CeQueueInitializer;
import org.sonar.server.computation.CleanReportQueueListener;
import org.sonar.server.computation.ComputeEngineProcessingModule;
// Compute engine
CEQueueStatusImpl.class,
ComputeEngineQueueMonitor.class,
- CeQueue.class,
+ CeQueueImpl.class,
CleanReportQueueListener.class,
ReportFiles.class,
ComputeEngineProcessingModule.class,
ProjectSettingsFactory.class,
IndexPurgeListener.class,
ReportSubmitter.class,
+ CeQueueCleaner.class,
CeQueueInitializer.class,
// Views plugin
ViewsBootstrap.class,
if (!delegate.awaitTermination(5, TimeUnit.SECONDS)) {
// Cancel currently executing tasks
delegate.shutdownNow();
- // Wait a while for tasks to respond to being cancelled
+ // Wait a while for tasks to respond to being canceled
if (!delegate.awaitTermination(5, TimeUnit.SECONDS)) {
Loggers.get(getClass()).error(format("Pool %s did not terminate", getClass().getSimpleName()));
}
{
+ "paging": {
+ "pageIndex": 1,
+ "pageSize": 10,
+ "total": 233
+ },
"tasks": [
{
"id": "BU_dO1vsORa8_beWCwsP",
--- /dev/null
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.server.computation;
+
+import java.io.File;
+import java.io.IOException;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+import org.sonar.api.platform.ServerUpgradeStatus;
+import org.sonar.api.utils.System2;
+import org.sonar.db.DbSession;
+import org.sonar.db.DbTester;
+import org.sonar.db.ce.CeQueueDto;
+import org.sonar.db.ce.CeTaskTypes;
+
+import static java.util.Arrays.asList;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class CeQueueCleanerTest {
+
+ @Rule
+ public DbTester dbTester = DbTester.create(System2.INSTANCE);
+
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ ServerUpgradeStatus serverUpgradeStatus = mock(ServerUpgradeStatus.class);
+ ReportFiles reportFiles = mock(ReportFiles.class, Mockito.RETURNS_DEEP_STUBS);
+ CeQueueImpl queue = mock(CeQueueImpl.class);
+ CeQueueCleaner underTest = new CeQueueCleaner(dbTester.getDbClient(), serverUpgradeStatus, reportFiles, queue);
+
+ @Test
+ public void reset_in_progress_tasks_to_pending() throws IOException {
+ insertInQueue("TASK_1", CeQueueDto.Status.PENDING);
+ insertInQueue("TASK_2", CeQueueDto.Status.IN_PROGRESS);
+
+ underTest.clean(dbTester.getSession());
+
+ assertThat(dbTester.getDbClient().ceQueueDao().countByStatus(dbTester.getSession(), CeQueueDto.Status.PENDING)).isEqualTo(2);
+ assertThat(dbTester.getDbClient().ceQueueDao().countByStatus(dbTester.getSession(), CeQueueDto.Status.IN_PROGRESS)).isEqualTo(0);
+ }
+
+ @Test
+ public void clear_queue_if_version_upgrade() {
+ when(serverUpgradeStatus.isUpgraded()).thenReturn(true);
+
+ underTest.clean(dbTester.getSession());
+
+ verify(queue).clear();
+ }
+
+ @Test
+ public void cancel_task_if_report_file_is_missing() throws IOException {
+ CeQueueDto task = insertInQueue("TASK_1", CeQueueDto.Status.PENDING, false);
+
+ underTest.clean(dbTester.getSession());
+
+ verify(queue).cancel(any(DbSession.class), eq(task));
+ }
+
+ @Test
+ public void delete_orphan_report_files() throws Exception {
+ // two files on disk but on task in queue
+ insertInQueue("TASK_1", CeQueueDto.Status.PENDING, true);
+ when(reportFiles.listUuids()).thenReturn(asList("TASK_1", "TASK_2"));
+
+ underTest.clean(dbTester.getSession());
+
+ verify(reportFiles).deleteIfExists("TASK_2");
+ }
+
+ private void insertInQueue(String taskUuid, CeQueueDto.Status status) throws IOException {
+ insertInQueue(taskUuid, status, true);
+ }
+
+ private CeQueueDto insertInQueue(String taskUuid, CeQueueDto.Status status, boolean createFile) throws IOException {
+ CeQueueDto queueDto = new CeQueueDto();
+ queueDto.setTaskType(CeTaskTypes.REPORT);
+ queueDto.setComponentUuid("PROJECT_1");
+ queueDto.setUuid(taskUuid);
+ queueDto.setStatus(status);
+ dbTester.getDbClient().ceQueueDao().insert(dbTester.getSession(), queueDto);
+ dbTester.getSession().commit();
+
+ File file = tempFolder.newFile();
+ when(reportFiles.fileForUuid(taskUuid)).thenReturn(file);
+ if (!createFile) {
+ file.delete();
+ }
+ return queueDto;
+ }
+}
--- /dev/null
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.server.computation;
+
+import com.google.common.base.Optional;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.sonar.api.utils.System2;
+import org.sonar.api.utils.internal.TestSystem2;
+import org.sonar.core.util.UuidFactory;
+import org.sonar.core.util.UuidFactoryImpl;
+import org.sonar.db.DbTester;
+import org.sonar.db.ce.CeActivityDto;
+import org.sonar.db.ce.CeQueueDto;
+import org.sonar.db.ce.CeTaskTypes;
+import org.sonar.server.computation.monitoring.CEQueueStatus;
+import org.sonar.server.computation.monitoring.CEQueueStatusImpl;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.hamcrest.Matchers.startsWith;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+
+public class CeQueueImplTest {
+
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
+ System2 system2 = new TestSystem2().setNow(1_450_000_000_000L);
+
+ @Rule
+ public DbTester dbTester = DbTester.create(system2);
+
+ UuidFactory uuidFactory = UuidFactoryImpl.INSTANCE;
+ CEQueueStatus queueStatus = new CEQueueStatusImpl();
+ CeQueueListener listener = mock(CeQueueListener.class);
+ CeQueue underTest = new CeQueueImpl(system2, dbTester.getDbClient(), uuidFactory, queueStatus, new CeQueueListener[] {listener});
+
+ @Test
+ public void test_submit() {
+ TaskSubmission submission = underTest.prepareSubmit();
+ submission.setComponentUuid("PROJECT_1");
+ submission.setType(CeTaskTypes.REPORT);
+ submission.setSubmitterLogin("rob");
+
+ CeTask task = underTest.submit(submission);
+ assertThat(task.getUuid()).isEqualTo(submission.getUuid());
+ assertThat(task.getComponentUuid()).isEqualTo("PROJECT_1");
+ assertThat(task.getSubmitterLogin()).isEqualTo("rob");
+
+ Optional<CeQueueDto> queueDto = dbTester.getDbClient().ceQueueDao().selectByUuid(dbTester.getSession(), submission.getUuid());
+ assertThat(queueDto.isPresent()).isTrue();
+ assertThat(queueDto.get().getTaskType()).isEqualTo(CeTaskTypes.REPORT);
+ assertThat(queueDto.get().getComponentUuid()).isEqualTo("PROJECT_1");
+ assertThat(queueDto.get().getSubmitterLogin()).isEqualTo("rob");
+ assertThat(queueDto.get().getCreatedAt()).isEqualTo(1_450_000_000_000L);
+ assertThat(queueStatus.getReceivedCount()).isEqualTo(1L);
+ }
+
+ @Test
+ public void fail_to_submit_if_paused() {
+ expectedException.expect(IllegalStateException.class);
+ expectedException.expectMessage("Compute Engine does not currently accept new tasks");
+ underTest.pauseSubmit();
+
+ submit(CeTaskTypes.REPORT, "PROJECT_1");
+ }
+
+ @Test
+ public void test_remove() {
+ CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1");
+ Optional<CeTask> peek = underTest.peek();
+ underTest.remove(peek.get(), CeActivityDto.Status.SUCCESS);
+
+ // queue is empty
+ assertThat(dbTester.getDbClient().ceQueueDao().selectByUuid(dbTester.getSession(), task.getUuid()).isPresent()).isFalse();
+ assertThat(underTest.peek().isPresent()).isFalse();
+
+ // available in history
+ Optional<CeActivityDto> history = dbTester.getDbClient().ceActivityDao().selectByUuid(dbTester.getSession(), task.getUuid());
+ assertThat(history.isPresent()).isTrue();
+ assertThat(history.get().getStatus()).isEqualTo(CeActivityDto.Status.SUCCESS);
+ assertThat(history.get().getIsLast()).isTrue();
+
+ verify(listener).onRemoved(task, CeActivityDto.Status.SUCCESS);
+ }
+
+ @Test
+ public void fail_to_remove_if_not_in_queue() throws Exception {
+ expectedException.expect(IllegalStateException.class);
+ CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1");
+ underTest.remove(task, CeActivityDto.Status.SUCCESS);
+
+ // fail
+ underTest.remove(task, CeActivityDto.Status.SUCCESS);
+ }
+
+ @Test
+ public void test_peek() throws Exception {
+ CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1");
+
+ Optional<CeTask> peek = underTest.peek();
+ assertThat(peek.isPresent()).isTrue();
+ assertThat(peek.get().getUuid()).isEqualTo(task.getUuid());
+ assertThat(peek.get().getType()).isEqualTo(CeTaskTypes.REPORT);
+ assertThat(peek.get().getComponentUuid()).isEqualTo("PROJECT_1");
+
+ // no more pending tasks
+ peek = underTest.peek();
+ assertThat(peek.isPresent()).isFalse();
+
+ verify(listener, never()).onRemoved(eq(task), any(CeActivityDto.Status.class));
+ }
+
+ @Test
+ public void peek_nothing_if_paused() throws Exception {
+ submit(CeTaskTypes.REPORT, "PROJECT_1");
+ underTest.pausePeek();
+
+ Optional<CeTask> peek = underTest.peek();
+ assertThat(peek.isPresent()).isFalse();
+ }
+
+ @Test
+ public void cancel_pending() throws Exception {
+ CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1");
+
+ // ignore
+ boolean canceled = underTest.cancel("UNKNOWN");
+ assertThat(canceled).isFalse();
+ verifyZeroInteractions(listener);
+
+ canceled = underTest.cancel(task.getUuid());
+ assertThat(canceled).isTrue();
+ Optional<CeActivityDto> activity = dbTester.getDbClient().ceActivityDao().selectByUuid(dbTester.getSession(), task.getUuid());
+ assertThat(activity.isPresent()).isTrue();
+ assertThat(activity.get().getStatus()).isEqualTo(CeActivityDto.Status.CANCELED);
+ verify(listener).onRemoved(task, CeActivityDto.Status.CANCELED);
+ }
+
+ @Test
+ public void fail_to_cancel_if_in_progress() throws Exception {
+ expectedException.expect(IllegalStateException.class);
+ expectedException.expectMessage(startsWith("Task is in progress and can't be canceled"));
+
+ CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1");
+ underTest.peek();
+
+ underTest.cancel(task.getUuid());
+ }
+
+ @Test
+ public void cancelAll_pendings_but_not_in_progress() throws Exception {
+ CeTask inProgressTask = submit(CeTaskTypes.REPORT, "PROJECT_1");
+ CeTask pendingTask1 = submit(CeTaskTypes.REPORT, "PROJECT_2");
+ CeTask pendingTask2 = submit(CeTaskTypes.REPORT, "PROJECT_3");
+ underTest.peek();
+
+ int canceledCount = underTest.cancelAll();
+ assertThat(canceledCount).isEqualTo(2);
+
+ Optional<CeActivityDto> history = dbTester.getDbClient().ceActivityDao().selectByUuid(dbTester.getSession(), pendingTask1.getUuid());
+ assertThat(history.get().getStatus()).isEqualTo(CeActivityDto.Status.CANCELED);
+ history = dbTester.getDbClient().ceActivityDao().selectByUuid(dbTester.getSession(), pendingTask2.getUuid());
+ assertThat(history.get().getStatus()).isEqualTo(CeActivityDto.Status.CANCELED);
+ history = dbTester.getDbClient().ceActivityDao().selectByUuid(dbTester.getSession(), inProgressTask.getUuid());
+ assertThat(history.isPresent()).isFalse();
+
+ verify(listener).onRemoved(pendingTask1, CeActivityDto.Status.CANCELED);
+ verify(listener).onRemoved(pendingTask2, CeActivityDto.Status.CANCELED);
+ }
+
+ @Test
+ public void pause_and_resume_submits() throws Exception {
+ assertThat(underTest.isSubmitPaused()).isFalse();
+ underTest.pauseSubmit();
+ assertThat(underTest.isSubmitPaused()).isTrue();
+ underTest.resumeSubmit();
+ assertThat(underTest.isSubmitPaused()).isFalse();
+ }
+
+ @Test
+ public void pause_and_resume_peeks() throws Exception {
+ assertThat(underTest.isPeekPaused()).isFalse();
+ underTest.pausePeek();
+ assertThat(underTest.isPeekPaused()).isTrue();
+ underTest.resumePeek();
+ assertThat(underTest.isPeekPaused()).isFalse();
+ }
+
+ private CeTask submit(String reportType, String componentUuid) {
+ TaskSubmission submission = underTest.prepareSubmit();
+ submission.setType(reportType);
+ submission.setComponentUuid(componentUuid);
+ return underTest.submit(submission);
+ }
+}
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+import org.mockito.InOrder;
import org.mockito.Mockito;
-import org.sonar.api.platform.ServerUpgradeStatus;
import org.sonar.api.utils.System2;
import org.sonar.db.DbSession;
import org.sonar.db.DbTester;
import org.sonar.server.computation.monitoring.CEQueueStatus;
import org.sonar.server.computation.monitoring.CEQueueStatusImpl;
-import static java.util.Arrays.asList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class CeQueueInitializerTest {
@Rule
public TemporaryFolder tempFolder = new TemporaryFolder();
- ServerUpgradeStatus serverUpgradeStatus = mock(ServerUpgradeStatus.class);
ReportFiles reportFiles = mock(ReportFiles.class, Mockito.RETURNS_DEEP_STUBS);
- CeQueue queue = mock(CeQueue.class);
CEQueueStatus queueStatus = new CEQueueStatusImpl();
- CeQueueInitializer underTest = new CeQueueInitializer(dbTester.getDbClient(), serverUpgradeStatus, reportFiles, queue, queueStatus);
+ CeQueueCleaner cleaner = mock(CeQueueCleaner.class);
+ ReportProcessingScheduler scheduler = mock(ReportProcessingScheduler.class);
+ CeQueueInitializer underTest = new CeQueueInitializer(dbTester.getDbClient(), queueStatus, cleaner, scheduler);
@Test
public void init_jmx_counters() throws IOException {
}
@Test
- public void reset_in_progress_tasks_to_pending() throws IOException {
- insertInQueue("TASK_1", CeQueueDto.Status.PENDING);
- insertInQueue("TASK_2", CeQueueDto.Status.IN_PROGRESS);
-
- underTest.start();
-
- assertThat(dbTester.getDbClient().ceQueueDao().countByStatus(dbTester.getSession(), CeQueueDto.Status.PENDING)).isEqualTo(2);
- assertThat(dbTester.getDbClient().ceQueueDao().countByStatus(dbTester.getSession(), CeQueueDto.Status.IN_PROGRESS)).isEqualTo(0);
- }
-
- @Test
- public void clear_queue_if_version_upgrade() {
- when(serverUpgradeStatus.isUpgraded()).thenReturn(true);
-
- underTest.start();
-
- verify(queue).clear();
- }
-
- @Test
- public void cancel_task_if_report_file_is_missing() throws IOException {
- CeQueueDto task = insertInQueue("TASK_1", CeQueueDto.Status.PENDING, false);
-
- underTest.start();
-
- verify(queue).cancel(any(DbSession.class), eq(task));
- }
-
- @Test
- public void delete_orphan_report_files() throws Exception {
- // two files on disk but on task in queue
- insertInQueue("TASK_1", CeQueueDto.Status.PENDING, true);
- when(reportFiles.listUuids()).thenReturn(asList("TASK_1", "TASK_2"));
+ public void clean_queue_then_start_scheduler_of_workers() throws IOException {
+ InOrder inOrder = Mockito.inOrder(cleaner, scheduler);
underTest.start();
- verify(reportFiles).deleteIfExists("TASK_2");
+ inOrder.verify(cleaner).clean(any(DbSession.class));
+ inOrder.verify(scheduler).schedule();
}
private void insertInQueue(String taskUuid, CeQueueDto.Status status) throws IOException {
+++ /dev/null
-/*
- * SonarQube, open source software quality management tool.
- * Copyright (C) 2008-2014 SonarSource
- * mailto:contact AT sonarsource DOT com
- *
- * SonarQube is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * SonarQube is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-package org.sonar.server.computation;
-
-import com.google.common.base.Optional;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.sonar.api.utils.System2;
-import org.sonar.api.utils.internal.TestSystem2;
-import org.sonar.core.util.UuidFactory;
-import org.sonar.core.util.UuidFactoryImpl;
-import org.sonar.db.DbTester;
-import org.sonar.db.ce.CeActivityDto;
-import org.sonar.db.ce.CeQueueDto;
-import org.sonar.db.ce.CeTaskTypes;
-import org.sonar.server.computation.monitoring.CEQueueStatus;
-import org.sonar.server.computation.monitoring.CEQueueStatusImpl;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.hamcrest.Matchers.startsWith;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyZeroInteractions;
-
-public class CeQueueTest {
-
- @Rule
- public ExpectedException expectedException = ExpectedException.none();
-
- System2 system2 = new TestSystem2().setNow(1_450_000_000_000L);
-
- @Rule
- public DbTester dbTester = DbTester.create(system2);
-
- UuidFactory uuidFactory = UuidFactoryImpl.INSTANCE;
- CEQueueStatus queueStatus = new CEQueueStatusImpl();
- CeQueueListener listener = mock(CeQueueListener.class);
- CeQueue underTest = new CeQueue(system2, dbTester.getDbClient(), uuidFactory, queueStatus, new CeQueueListener[] {listener});
-
- @Test
- public void test_submit() {
- CeTaskSubmit submit = underTest.prepareSubmit();
- submit.setComponentUuid("PROJECT_1");
- submit.setType(CeTaskTypes.REPORT);
- submit.setSubmitterLogin("rob");
-
- CeTask task = underTest.submit(submit);
- assertThat(task.getUuid()).isEqualTo(submit.getUuid());
- assertThat(task.getComponentUuid()).isEqualTo("PROJECT_1");
- assertThat(task.getSubmitterLogin()).isEqualTo("rob");
-
- Optional<CeQueueDto> queueDto = dbTester.getDbClient().ceQueueDao().selectByUuid(dbTester.getSession(), submit.getUuid());
- assertThat(queueDto.isPresent()).isTrue();
- assertThat(queueDto.get().getTaskType()).isEqualTo(CeTaskTypes.REPORT);
- assertThat(queueDto.get().getComponentUuid()).isEqualTo("PROJECT_1");
- assertThat(queueDto.get().getSubmitterLogin()).isEqualTo("rob");
- assertThat(queueDto.get().getCreatedAt()).isEqualTo(1_450_000_000_000L);
- assertThat(queueStatus.getReceivedCount()).isEqualTo(1L);
- }
-
- @Test
- public void fail_to_submit_if_paused() {
- expectedException.expect(IllegalStateException.class);
- expectedException.expectMessage("Compute Engine does not currently accept new tasks");
- underTest.pauseSubmit();
-
- submit(CeTaskTypes.REPORT, "PROJECT_1");
- }
-
- @Test
- public void test_remove() {
- CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1");
- Optional<CeTask> peek = underTest.peek();
- underTest.remove(peek.get(), CeActivityDto.Status.SUCCESS);
-
- // queue is empty
- assertThat(dbTester.getDbClient().ceQueueDao().selectByUuid(dbTester.getSession(), task.getUuid()).isPresent()).isFalse();
- assertThat(underTest.peek().isPresent()).isFalse();
-
- // available in history
- Optional<CeActivityDto> history = dbTester.getDbClient().ceActivityDao().selectByUuid(dbTester.getSession(), task.getUuid());
- assertThat(history.isPresent()).isTrue();
- assertThat(history.get().getStatus()).isEqualTo(CeActivityDto.Status.SUCCESS);
- assertThat(history.get().getIsLast()).isTrue();
-
- verify(listener).onRemoved(task, CeActivityDto.Status.SUCCESS);
- }
-
- @Test
- public void fail_to_remove_if_not_in_queue() throws Exception {
- expectedException.expect(IllegalStateException.class);
- CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1");
- underTest.remove(task, CeActivityDto.Status.SUCCESS);
-
- // fail
- underTest.remove(task, CeActivityDto.Status.SUCCESS);
- }
-
- @Test
- public void test_peek() throws Exception {
- CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1");
-
- Optional<CeTask> peek = underTest.peek();
- assertThat(peek.isPresent()).isTrue();
- assertThat(peek.get().getUuid()).isEqualTo(task.getUuid());
- assertThat(peek.get().getType()).isEqualTo(CeTaskTypes.REPORT);
- assertThat(peek.get().getComponentUuid()).isEqualTo("PROJECT_1");
-
- // no more pending tasks
- peek = underTest.peek();
- assertThat(peek.isPresent()).isFalse();
-
- verify(listener, never()).onRemoved(eq(task), any(CeActivityDto.Status.class));
- }
-
- @Test
- public void peek_nothing_if_paused() throws Exception {
- submit(CeTaskTypes.REPORT, "PROJECT_1");
- underTest.pausePeek();
-
- Optional<CeTask> peek = underTest.peek();
- assertThat(peek.isPresent()).isFalse();
- }
-
- @Test
- public void cancel_pending() throws Exception {
- CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1");
-
- // ignore
- boolean canceled = underTest.cancel("UNKNOWN");
- assertThat(canceled).isFalse();
- verifyZeroInteractions(listener);
-
- canceled = underTest.cancel(task.getUuid());
- assertThat(canceled).isTrue();
- Optional<CeActivityDto> activity = dbTester.getDbClient().ceActivityDao().selectByUuid(dbTester.getSession(), task.getUuid());
- assertThat(activity.isPresent()).isTrue();
- assertThat(activity.get().getStatus()).isEqualTo(CeActivityDto.Status.CANCELED);
- verify(listener).onRemoved(task, CeActivityDto.Status.CANCELED);
- }
-
- @Test
- public void fail_to_cancel_if_in_progress() throws Exception {
- expectedException.expect(IllegalStateException.class);
- expectedException.expectMessage(startsWith("Task is in progress and can't be cancelled"));
-
- CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1");
- underTest.peek();
-
- underTest.cancel(task.getUuid());
- }
-
- @Test
- public void cancelAll_pendings_but_not_in_progress() throws Exception {
- CeTask inProgressTask = submit(CeTaskTypes.REPORT, "PROJECT_1");
- CeTask pendingTask1 = submit(CeTaskTypes.REPORT, "PROJECT_2");
- CeTask pendingTask2 = submit(CeTaskTypes.REPORT, "PROJECT_3");
- underTest.peek();
-
- int canceledCount = underTest.cancelAll();
- assertThat(canceledCount).isEqualTo(2);
-
- Optional<CeActivityDto> history = dbTester.getDbClient().ceActivityDao().selectByUuid(dbTester.getSession(), pendingTask1.getUuid());
- assertThat(history.get().getStatus()).isEqualTo(CeActivityDto.Status.CANCELED);
- history = dbTester.getDbClient().ceActivityDao().selectByUuid(dbTester.getSession(), pendingTask2.getUuid());
- assertThat(history.get().getStatus()).isEqualTo(CeActivityDto.Status.CANCELED);
- history = dbTester.getDbClient().ceActivityDao().selectByUuid(dbTester.getSession(), inProgressTask.getUuid());
- assertThat(history.isPresent()).isFalse();
-
- verify(listener).onRemoved(pendingTask1, CeActivityDto.Status.CANCELED);
- verify(listener).onRemoved(pendingTask2, CeActivityDto.Status.CANCELED);
- }
-
- @Test
- public void pause_and_resume_submits() throws Exception {
- assertThat(underTest.isSubmitPaused()).isFalse();
- underTest.pauseSubmit();
- assertThat(underTest.isSubmitPaused()).isTrue();
- underTest.resumeSubmit();
- assertThat(underTest.isSubmitPaused()).isFalse();
- }
-
- @Test
- public void pause_and_resume_peeks() throws Exception {
- assertThat(underTest.isPeekPaused()).isFalse();
- underTest.pausePeek();
- assertThat(underTest.isPeekPaused()).isTrue();
- underTest.resumePeek();
- assertThat(underTest.isPeekPaused()).isFalse();
- }
-
- private CeTask submit(String reportType, String componentUuid) {
- CeTaskSubmit submit = underTest.prepareSubmit();
- submit.setType(reportType);
- submit.setComponentUuid(componentUuid);
- return underTest.submit(submit);
- }
-}
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
public class CeWorkerImplTest {
- CeQueue queue = mock(CeQueue.class);
+ CeQueue queue = mock(CeQueueImpl.class);
ReportTaskProcessor taskProcessor = mock(ReportTaskProcessor.class);
CeWorker underTest = new CeWorkerImpl(queue, taskProcessor);
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
-import org.sonar.api.platform.Server;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@Test
public void schedule_at_fixed_rate_adding_a_ReportProcessingTask_to_the_queue() throws Exception {
- when(batchExecutorService.scheduleAtFixedRate(any(Runnable.class), eq(0L), eq(10L), eq(TimeUnit.SECONDS)))
+ when(batchExecutorService.scheduleAtFixedRate(any(Runnable.class), eq(0L), eq(1L), eq(TimeUnit.SECONDS)))
.thenAnswer(new ExecuteFirstArgAsRunnable());
- underTest.onServerStart(mock(Server.class));
-
- assertThat(processingQueue.getTasks()).hasSize(1);
- assertThat(processingQueue.getTasks().iterator().next()).isInstanceOf(CeWorker.class);
- }
-
- @Test
- public void adds_immediately_a_ReportProcessingTask_to_the_queue() throws Exception {
- doAnswer(new ExecuteFirstArgAsRunnable()).when(batchExecutorService).execute(any(Runnable.class));
-
- underTest.startAnalysisTaskNow();
+ underTest.schedule();
assertThat(processingQueue.getTasks()).hasSize(1);
assertThat(processingQueue.getTasks().iterator().next()).isInstanceOf(CeWorker.class);
@Rule
public UserSessionRule userSession = UserSessionRule.standalone();
- CeQueue queue = mock(CeQueue.class);
+ CeQueue queue = mock(CeQueueImpl.class);
ReportFiles reportFiles = mock(ReportFiles.class);
ComponentService componentService = mock(ComponentService.class);
ReportSubmitter underTest = new ReportSubmitter(queue, userSession, reportFiles, componentService);
@Test
public void submit_a_report_on_existing_project() {
- when(queue.prepareSubmit()).thenReturn(new CeTaskSubmit("TASK_1"));
+ when(queue.prepareSubmit()).thenReturn(new TestTaskSubmission("TASK_1"));
userSession.setGlobalPermissions(GlobalPermissions.SCAN_EXECUTION);
when(componentService.getNullableByKey("MY_PROJECT")).thenReturn(new ComponentDto().setUuid("P1"));
underTest.submit("MY_PROJECT", null, "My Project", IOUtils.toInputStream("{binary}"));
- verify(queue).submit(argThat(new TypeSafeMatcher<CeTaskSubmit>() {
+ verify(queue).submit(argThat(new TypeSafeMatcher<TaskSubmission>() {
@Override
- protected boolean matchesSafely(CeTaskSubmit submit) {
+ protected boolean matchesSafely(TaskSubmission submit) {
return submit.getType().equals(CeTaskTypes.REPORT) && submit.getComponentUuid().equals("P1") &&
submit.getUuid().equals("TASK_1");
}
@Test
public void provision_project_if_does_not_exist() throws Exception {
- when(queue.prepareSubmit()).thenReturn(new CeTaskSubmit("TASK_1"));
+ when(queue.prepareSubmit()).thenReturn(new TestTaskSubmission("TASK_1"));
userSession.setGlobalPermissions(GlobalPermissions.SCAN_EXECUTION, GlobalPermissions.PROVISIONING);
when(componentService.getNullableByKey("MY_PROJECT")).thenReturn(null);
when(componentService.create(any(NewComponent.class))).thenReturn(new ComponentDto().setUuid("P1"));
underTest.submit("MY_PROJECT", null, "My Project", IOUtils.toInputStream("{binary}"));
- verify(queue).submit(argThat(new TypeSafeMatcher<CeTaskSubmit>() {
+ verify(queue).submit(argThat(new TypeSafeMatcher<TaskSubmission>() {
@Override
- protected boolean matchesSafely(CeTaskSubmit submit) {
+ protected boolean matchesSafely(TaskSubmission submit) {
return submit.getType().equals(CeTaskTypes.REPORT) && submit.getComponentUuid().equals("P1") &&
submit.getUuid().equals("TASK_1");
}
--- /dev/null
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.server.computation;
+
+import javax.annotation.Nullable;
+
+public class TestTaskSubmission implements TaskSubmission {
+ private final String uuid;
+ private String type;
+ private String componentUuid;
+ private String submitterLogin;
+
+ public TestTaskSubmission(String uuid) {
+ this.uuid = uuid;
+ }
+
+ @Override
+ public String getUuid() {
+ return uuid;
+ }
+
+ @Override
+ public String getType() {
+ return type;
+ }
+
+ @Override
+ public TaskSubmission setType(String s) {
+ this.type = s;
+ return this;
+ }
+
+ @Override
+ public String getComponentUuid() {
+ return componentUuid;
+ }
+
+ @Override
+ public TaskSubmission setComponentUuid(@Nullable String s) {
+ this.componentUuid = s;
+ return this;
+ }
+
+ @Override
+ public String getSubmitterLogin() {
+ return submitterLogin;
+ }
+
+ @Override
+ public TaskSubmission setSubmitterLogin(@Nullable String s) {
+ this.submitterLogin = s;
+ return this;
+ }
+}
package org.sonar.server.computation.monitoring;
import org.junit.Test;
-import org.sonar.server.computation.CeQueue;
+import org.sonar.server.computation.CeQueueImpl;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.entry;
private static final long SUCCESS_COUNT = 13;
private static final long PROCESSING_TIME = 987;
- private ComputeEngineQueueMonitor underTest = new ComputeEngineQueueMonitor(new DumbCEQueueStatus(), mock(CeQueue.class));
+ private ComputeEngineQueueMonitor underTest = new ComputeEngineQueueMonitor(new DumbCEQueueStatus(), mock(CeQueueImpl.class));
@Test
public void name_is_ComputeEngineQueue() {
--- /dev/null
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.server.computation.ws;
+
+import java.util.Collections;
+import java.util.List;
+import org.junit.Rule;
+import org.junit.Test;
+import org.sonar.api.server.ws.WebService;
+import org.sonar.api.utils.System2;
+import org.sonar.api.web.UserRole;
+import org.sonar.core.util.Protobuf;
+import org.sonar.db.DbTester;
+import org.sonar.db.ce.CeActivityDto;
+import org.sonar.db.ce.CeQueueDto;
+import org.sonar.db.ce.CeTaskTypes;
+import org.sonar.server.plugins.MimeTypes;
+import org.sonar.server.tester.UserSessionRule;
+import org.sonar.server.ws.TestResponse;
+import org.sonar.server.ws.WsActionTester;
+import org.sonarqube.ws.WsCe;
+
+import static java.util.Arrays.asList;
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class CeActivityWsActionTest {
+
+ @Rule
+ public UserSessionRule userSession = UserSessionRule.standalone();
+
+ @Rule
+ public DbTester dbTester = DbTester.create(System2.INSTANCE);
+
+ CeWsTaskFormatter formatter = new CeWsTaskFormatter(dbTester.getDbClient());
+ CeActivityWsAction underTest = new CeActivityWsAction(userSession, dbTester.getDbClient(), formatter);
+ WsActionTester tester = new WsActionTester(underTest);
+
+ @Test
+ public void get_all_past_activity() {
+ userSession.setGlobalPermissions(UserRole.ADMIN);
+ insert("T1", "PROJECT_1", CeActivityDto.Status.SUCCESS);
+ insert("T2", "PROJECT_2", CeActivityDto.Status.FAILED);
+
+ TestResponse wsResponse = tester.newRequest()
+ .setMediaType(MimeTypes.PROTOBUF)
+ .execute();
+
+ // verify the protobuf response
+ WsCe.ActivityResponse activityResponse = Protobuf.read(wsResponse.getInputStream(), WsCe.ActivityResponse.PARSER);
+ assertThat(activityResponse.getTasksCount()).isEqualTo(2);
+
+ // chronological order, from newest to oldest
+ assertThat(activityResponse.getTasks(0).getId()).isEqualTo("T2");
+ assertThat(activityResponse.getTasks(0).getStatus()).isEqualTo(WsCe.TaskStatus.FAILED);
+ assertThat(activityResponse.getTasks(0).getComponentId()).isEqualTo("PROJECT_2");
+ assertThat(activityResponse.getTasks(0).getExecutionTimeMs()).isEqualTo(500L);
+ assertThat(activityResponse.getTasks(1).getId()).isEqualTo("T1");
+ assertThat(activityResponse.getTasks(1).getStatus()).isEqualTo(WsCe.TaskStatus.SUCCESS);
+ assertThat(activityResponse.getTasks(1).getComponentId()).isEqualTo("PROJECT_1");
+ }
+
+ @Test
+ public void filter_by_status() {
+ userSession.setGlobalPermissions(UserRole.ADMIN);
+ insert("T1", "PROJECT_1", CeActivityDto.Status.SUCCESS);
+ insert("T2", "PROJECT_2", CeActivityDto.Status.FAILED);
+
+ TestResponse wsResponse = tester.newRequest()
+ .setParam("status", "FAILED")
+ .setMediaType(MimeTypes.PROTOBUF)
+ .execute();
+
+ WsCe.ActivityResponse activityResponse = Protobuf.read(wsResponse.getInputStream(), WsCe.ActivityResponse.PARSER);
+ assertThat(activityResponse.getTasksCount()).isEqualTo(1);
+ assertThat(activityResponse.getTasks(0).getId()).isEqualTo("T2");
+ }
+
+ @Test
+ public void filter_on_current_activities() {
+ userSession.setGlobalPermissions(UserRole.ADMIN);
+ // T2 is the current activity (the most recent one)
+ insert("T1", "PROJECT_1", CeActivityDto.Status.SUCCESS);
+ insert("T2", "PROJECT_1", CeActivityDto.Status.FAILED);
+
+ TestResponse wsResponse = tester.newRequest()
+ .setParam("onlyCurrents", "true")
+ .setMediaType(MimeTypes.PROTOBUF)
+ .execute();
+
+ WsCe.ActivityResponse activityResponse = Protobuf.read(wsResponse.getInputStream(), WsCe.ActivityResponse.PARSER);
+ assertThat(activityResponse.getTasksCount()).isEqualTo(1);
+ assertThat(activityResponse.getTasks(0).getId()).isEqualTo("T2");
+ }
+
+ @Test
+ public void paginate_results() {
+ userSession.setGlobalPermissions(UserRole.ADMIN);
+ insert("T1", "PROJECT_1", CeActivityDto.Status.SUCCESS);
+ insert("T2", "PROJECT_2", CeActivityDto.Status.FAILED);
+
+ assertPage(1, 1, 2, asList("T2"));
+ assertPage(2, 1, 2, asList("T1"));
+ assertPage(1, 10, 2, asList("T2", "T1"));
+ assertPage(2, 10, 2, Collections.<String>emptyList());
+ }
+
+ private void assertPage(int pageIndex, int pageSize, int expectedTotal, List<String> expectedOrderedTaskIds) {
+ TestResponse wsResponse = tester.newRequest()
+ .setMediaType(MimeTypes.PROTOBUF)
+ .setParam(WebService.Param.PAGE, Integer.toString(pageIndex))
+ .setParam(WebService.Param.PAGE_SIZE, Integer.toString(pageSize))
+ .execute();
+
+ WsCe.ActivityResponse activityResponse = Protobuf.read(wsResponse.getInputStream(), WsCe.ActivityResponse.PARSER);
+ assertThat(activityResponse.getPaging().getPageIndex()).isEqualTo(pageIndex);
+ assertThat(activityResponse.getPaging().getPageSize()).isEqualTo(pageSize);
+ assertThat(activityResponse.getPaging().getTotal()).isEqualTo(expectedTotal);
+
+ assertThat(activityResponse.getTasksCount()).isEqualTo(expectedOrderedTaskIds.size());
+ for (int i = 0; i < expectedOrderedTaskIds.size(); i++) {
+ String expectedTaskId = expectedOrderedTaskIds.get(i);
+ assertThat(activityResponse.getTasks(i).getId()).isEqualTo(expectedTaskId);
+ }
+ }
+
+ @Test
+ public void get_project_activity() {
+ userSession.addProjectUuidPermissions(UserRole.ADMIN, "PROJECT_1");
+ insert("T1", "PROJECT_1", CeActivityDto.Status.SUCCESS);
+ insert("T2", "PROJECT_2", CeActivityDto.Status.FAILED);
+
+ TestResponse wsResponse = tester.newRequest()
+ .setParam("componentId", "PROJECT_1")
+ .setMediaType(MimeTypes.PROTOBUF)
+ .execute();
+
+ // verify the protobuf response
+ WsCe.ActivityResponse activityResponse = Protobuf.read(wsResponse.getInputStream(), WsCe.ActivityResponse.PARSER);
+ assertThat(activityResponse.getTasksCount()).isEqualTo(1);
+ assertThat(activityResponse.getTasks(0).getId()).isEqualTo("T1");
+ assertThat(activityResponse.getTasks(0).getStatus()).isEqualTo(WsCe.TaskStatus.SUCCESS);
+ assertThat(activityResponse.getTasks(0).getComponentId()).isEqualTo("PROJECT_1");
+ }
+
+ private CeActivityDto insert(String taskUuid, String componentUuid, CeActivityDto.Status status) {
+ CeQueueDto queueDto = new CeQueueDto();
+ queueDto.setTaskType(CeTaskTypes.REPORT);
+ queueDto.setComponentUuid(componentUuid);
+ queueDto.setUuid(taskUuid);
+ CeActivityDto activityDto = new CeActivityDto(queueDto);
+ activityDto.setStatus(status);
+ activityDto.setExecutionTimeMs(500L);
+ dbTester.getDbClient().ceActivityDao().insert(dbTester.getSession(), activityDto);
+ dbTester.getSession().commit();
+ return activityDto;
+ }
+}
import org.sonar.core.util.Protobuf;
import org.sonar.db.ce.CeTaskTypes;
import org.sonar.server.computation.CeTask;
-import org.sonar.server.computation.ReportProcessingScheduler;
import org.sonar.server.computation.ReportSubmitter;
import org.sonar.server.plugins.MimeTypes;
import org.sonar.server.ws.TestResponse;
public class CeSubmitWsActionTest {
ReportSubmitter reportSubmitter = mock(ReportSubmitter.class);
- ReportProcessingScheduler reportProcessingScheduler = mock(ReportProcessingScheduler.class);
- CeSubmitWsAction underTest = new CeSubmitWsAction(reportSubmitter, reportProcessingScheduler);
+ CeSubmitWsAction underTest = new CeSubmitWsAction(reportSubmitter);
WsActionTester tester = new WsActionTester(underTest);
@Test
.execute();
verify(reportSubmitter).submit(eq("my_project"), Matchers.isNull(String.class), eq("My Project"), any(InputStream.class));
- verify(reportProcessingScheduler).startAnalysisTaskNow();
- // verify the protobuf response
WsCe.SubmitResponse submitResponse = Protobuf.read(wsResponse.getInputStream(), WsCe.SubmitResponse.PARSER);
assertThat(submitResponse.getTaskId()).isEqualTo("TASK_1");
assertThat(submitResponse.getProjectId()).isEqualTo("PROJECT_1");
}
@Test
- public void test_response_example() {
+ public void test_example_json_response() {
CeTask task = new CeTask("TASK_1", CeTaskTypes.REPORT, "PROJECT_1", "robert");
when(reportSubmitter.submit(eq("my_project"), Matchers.isNull(String.class), eq("My Project"), any(InputStream.class))).thenReturn(task);
import org.junit.Test;
import org.mockito.Mockito;
import org.sonar.api.server.ws.WebService;
-import org.sonar.server.computation.ReportProcessingScheduler;
import org.sonar.server.computation.ReportSubmitter;
import static org.assertj.core.api.Assertions.assertThat;
@Test
public void define() throws Exception {
- CeWsAction wsAction = new CeSubmitWsAction(mock(ReportSubmitter.class), mock(ReportProcessingScheduler.class));
+ CeWsAction wsAction = new CeSubmitWsAction(mock(ReportSubmitter.class));
CeWs ws = new CeWs(wsAction);
WebService.Context context = mock(WebService.Context.class, Mockito.RETURNS_DEEP_STUBS);
t.column 'execution_time_ms', :big_integer, :null => true
end
add_index 'ce_activity', 'uuid', :name => 'ce_activity_uuid', :unique => true
+ add_index 'ce_activity', 'component_uuid', :name => 'ce_activity_component_uuid'
end
end
*/
package org.sonar.batch.util;
+import com.google.common.base.Strings;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
+import javax.annotation.Nullable;
import org.apache.commons.lang.StringUtils;
public class BatchUtils {
return StringUtils.replace(cleanKey, ":", "_");
}
- public static String encodeForUrl(String url) {
+ public static String encodeForUrl(@Nullable String url) {
try {
- return URLEncoder.encode(url, "UTF-8");
+ return URLEncoder.encode(Strings.nullToEmpty(url), "UTF-8");
} catch (UnsupportedEncodingException e) {
throw new IllegalStateException("Encoding not supported", e);
--- /dev/null
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.batch.util;
+
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class BatchUtilsTest {
+
+ @Test
+ public void encodeForUrl() throws Exception {
+ assertThat(BatchUtils.encodeForUrl(null)).isEqualTo("");
+ assertThat(BatchUtils.encodeForUrl("")).isEqualTo("");
+ assertThat(BatchUtils.encodeForUrl("foo")).isEqualTo("foo");
+ assertThat(BatchUtils.encodeForUrl("foo&bar")).isEqualTo("foo%26bar");
+ }
+}
}
/**
- * Ordered by id asc -> oldest to newest
+ * Ordered by id desc -> newest to oldest
*/
public List<CeActivityDto> selectByQuery(DbSession dbSession, CeActivityQuery query, RowBounds rowBounds) {
return mapper(dbSession).selectByQuery(query, rowBounds);
}
+ public int countByQuery(DbSession dbSession, CeActivityQuery query) {
+ return mapper(dbSession).countByQuery(query);
+ }
+
private CeActivityMapper mapper(DbSession dbSession) {
return dbSession.getMapper(CeActivityMapper.class);
}
List<CeActivityDto> selectByQuery(@Param("query") CeActivityQuery query, RowBounds rowBounds);
+ int countByQuery(@Param("query") CeActivityQuery query);
+
void insert(CeActivityDto dto);
void updateIsLastToFalseForLastKey(@Param("isLastKey") String isLastKey, @Param("updatedAt") long updatedAt);
*/
package org.sonar.db.ce;
+import javax.annotation.CheckForNull;
+import javax.annotation.Nullable;
+
public class CeActivityQuery {
private boolean onlyCurrents = false;
private CeActivityDto.Status status;
private String type;
+ @CheckForNull
public String getComponentUuid() {
return componentUuid;
}
- public CeActivityQuery setComponentUuid(String componentUuid) {
+ public CeActivityQuery setComponentUuid(@Nullable String componentUuid) {
this.componentUuid = componentUuid;
return this;
}
return this;
}
+ @CheckForNull
public CeActivityDto.Status getStatus() {
return status;
}
- public CeActivityQuery setStatus(CeActivityDto.Status status) {
+ public CeActivityQuery setStatus(@Nullable CeActivityDto.Status status) {
this.status = status;
return this;
}
+ @CheckForNull
public String getType() {
return type;
}
- public CeActivityQuery setType(String type) {
+ public CeActivityQuery setType(@Nullable String type) {
this.type = type;
return this;
}
order by ca.id desc
</select>
+ <select id="countByQuery" parameterType="map" resultType="int">
+ select count(ca.id)
+ from ce_activity ca
+ <where>
+ <if test="query.onlyCurrents">
+ ca.is_last=${_true}
+ </if>
+ <if test="query.componentUuid != null">
+ ca.component_uuid=#{query.componentUuid}
+ </if>
+ <if test="query.status != null">
+ ca.status=#{query.status}
+ </if>
+ <if test="query.type != null">
+ ca.task_type=#{query.type}
+ </if>
+ </where>
+ </select>
+
<insert id="insert" parameterType="org.sonar.db.ce.CeActivityDto" useGeneratedKeys="false">
insert into ce_activity
(uuid, component_uuid, status, task_type, is_last, is_last_key, submitter_login, submitted_at, started_at,
CREATE UNIQUE INDEX "CE_QUEUE_UUID" ON "CE_QUEUE" ("UUID");
CREATE UNIQUE INDEX "CE_ACTIVITY_UUID" ON "CE_ACTIVITY" ("UUID");
+
+CREATE INDEX "CE_ACTIVITY_COMPONENT_UUID" ON "CE_ACTIVITY" ("COMPONENT_UUID");
assertThat(dtos).extracting("uuid").containsExactly("TASK_4");
}
+ @Test
+ public void test_countByQuery() throws Exception {
+ insert("TASK_1", REPORT, "PROJECT_1", CeActivityDto.Status.SUCCESS);
+ insert("TASK_2", REPORT, "PROJECT_1", CeActivityDto.Status.FAILED);
+ insert("TASK_3", REPORT, "PROJECT_2", CeActivityDto.Status.SUCCESS);
+ insert("TASK_4", "views", null, CeActivityDto.Status.SUCCESS);
+
+ // no filters
+ CeActivityQuery query = new CeActivityQuery();
+ assertThat(underTest.countByQuery(db.getSession(), query)).isEqualTo(4);
+
+ // select by component uuid
+ query = new CeActivityQuery().setComponentUuid("PROJECT_1");
+ assertThat(underTest.countByQuery(db.getSession(), query)).isEqualTo(2);
+
+ // select by status
+ query = new CeActivityQuery().setStatus(CeActivityDto.Status.SUCCESS);
+ assertThat(underTest.countByQuery(db.getSession(), query)).isEqualTo(3);
+
+ // select by type
+ query = new CeActivityQuery().setType(REPORT);
+ assertThat(underTest.countByQuery(db.getSession(), query)).isEqualTo(3);
+ query = new CeActivityQuery().setType("views");
+ assertThat(underTest.countByQuery(db.getSession(), query)).isEqualTo(1);
+ }
+
@Test
public void deleteOlderThan() throws Exception {
insertWithDate("TASK_1", 1_450_000_000_000L);