@@ -20,16 +20,7 @@ | |||
package org.sonar.server.computation; | |||
import com.google.common.base.Optional; | |||
import org.sonar.api.server.ServerSide; | |||
import org.sonar.api.utils.System2; | |||
import org.sonar.core.util.UuidFactory; | |||
import org.sonar.db.DbClient; | |||
import org.sonar.db.DbSession; | |||
import org.sonar.db.ce.CeActivityDto; | |||
import org.sonar.db.ce.CeQueueDto; | |||
import org.sonar.server.computation.monitoring.CEQueueStatus; | |||
import static java.lang.String.format; | |||
/** | |||
* Queue of pending Compute Engine tasks. Both producer and consumer actions | |||
@@ -38,96 +29,40 @@ import static java.lang.String.format; | |||
* This class is decoupled from the regular task type {@link org.sonar.db.ce.CeTaskTypes#REPORT}. | |||
* </p> | |||
*/ | |||
@ServerSide | |||
public class CeQueue { | |||
private final System2 system2; | |||
private final DbClient dbClient; | |||
private final UuidFactory uuidFactory; | |||
private final CEQueueStatus queueStatus; | |||
private final CeQueueListener[] listeners; | |||
// state | |||
private boolean submitPaused = false; | |||
private boolean peekPaused = false; | |||
public CeQueue(System2 system2, DbClient dbClient, UuidFactory uuidFactory, | |||
CEQueueStatus queueStatus, CeQueueListener[] listeners) { | |||
this.system2 = system2; | |||
this.dbClient = dbClient; | |||
this.uuidFactory = uuidFactory; | |||
this.queueStatus = queueStatus; | |||
this.listeners = listeners; | |||
} | |||
public CeTaskSubmit prepareSubmit() { | |||
return new CeTaskSubmit(uuidFactory.create()); | |||
} | |||
public CeTask submit(CeTaskSubmit submit) { | |||
if (submitPaused) { | |||
throw new IllegalStateException("Compute Engine does not currently accept new tasks"); | |||
} | |||
CeTask task = new CeTask(submit); | |||
DbSession dbSession = dbClient.openSession(false); | |||
try { | |||
CeQueueDto dto = new CeQueueDto(); | |||
dto.setUuid(task.getUuid()); | |||
dto.setTaskType(task.getType()); | |||
dto.setComponentUuid(task.getComponentUuid()); | |||
dto.setStatus(CeQueueDto.Status.PENDING); | |||
dto.setSubmitterLogin(task.getSubmitterLogin()); | |||
dto.setStartedAt(null); | |||
dbClient.ceQueueDao().insert(dbSession, dto); | |||
dbSession.commit(); | |||
queueStatus.addReceived(); | |||
return task; | |||
} finally { | |||
dbClient.closeSession(dbSession); | |||
} | |||
} | |||
public Optional<CeTask> peek() { | |||
if (peekPaused) { | |||
return Optional.absent(); | |||
} | |||
DbSession dbSession = dbClient.openSession(false); | |||
try { | |||
Optional<CeQueueDto> dto = dbClient.ceQueueDao().peek(dbSession); | |||
if (!dto.isPresent()) { | |||
return Optional.absent(); | |||
} | |||
queueStatus.addInProgress(); | |||
return Optional.of(new CeTask(dto.get())); | |||
} finally { | |||
dbClient.closeSession(dbSession); | |||
} | |||
} | |||
public interface CeQueue { | |||
/** | |||
* Build an instance of {@link TaskSubmission} required for {@link #submit(TaskSubmission)}. It allows | |||
* to enforce that task ids are generated by the queue. It's used also for having access | |||
* to the id before submitting the task to the queue. | |||
*/ | |||
TaskSubmission prepareSubmit(); | |||
public boolean cancel(String taskUuid) { | |||
DbSession dbSession = dbClient.openSession(false); | |||
try { | |||
Optional<CeQueueDto> queueDto = dbClient.ceQueueDao().selectByUuid(dbSession, taskUuid); | |||
if (queueDto.isPresent()) { | |||
if (!queueDto.get().getStatus().equals(CeQueueDto.Status.PENDING)) { | |||
throw new IllegalStateException(String.format("Task is in progress and can't be cancelled [uuid=%s]", taskUuid)); | |||
} | |||
cancel(dbSession, queueDto.get()); | |||
return true; | |||
} | |||
return false; | |||
} finally { | |||
dbClient.closeSession(dbSession); | |||
} | |||
} | |||
/** | |||
* Submits a task to the queue. The task is processed asynchronously. | |||
* If submits are paused (see {@link #isSubmitPaused()}, then an | |||
* unchecked exception is thrown. | |||
*/ | |||
CeTask submit(TaskSubmission submission); | |||
void cancel(DbSession dbSession, CeQueueDto q) { | |||
CeActivityDto activityDto = new CeActivityDto(q); | |||
activityDto.setStatus(CeActivityDto.Status.CANCELED); | |||
remove(dbSession, new CeTask(q), q, activityDto); | |||
} | |||
/** | |||
* Peek the oldest task in status {@link org.sonar.db.ce.CeQueueDto.Status#PENDING}. | |||
* The task status is changed to {@link org.sonar.db.ce.CeQueueDto.Status#IN_PROGRESS}. | |||
* Does not return anything if the queue is paused (see {@link #isPeekPaused()}. | |||
* | |||
* <p>Only a single task can be peeked by project.</p> | |||
* | |||
* <p>An unchecked exception may be thrown on technical errors (db connection, ...).</p> | |||
*/ | |||
Optional<CeTask> peek(); | |||
/** | |||
* Cancels a task in status {@link org.sonar.db.ce.CeQueueDto.Status#PENDING}. An unchecked | |||
* exception is thrown if the status is not {@link org.sonar.db.ce.CeQueueDto.Status#PENDING}. | |||
* The method does nothing and returns {@code false} if the task does not exist. | |||
* | |||
* @return true if the task exists and is successfully canceled. | |||
*/ | |||
boolean cancel(String taskUuid); | |||
/** | |||
* Removes all the tasks from the queue, whatever their status. They are marked | |||
@@ -137,9 +72,7 @@ public class CeQueue { | |||
* | |||
* @return the number of canceled tasks | |||
*/ | |||
public int clear() { | |||
return cancelAll(true); | |||
} | |||
int clear(); | |||
/** | |||
* Similar as {@link #clear()}, except that the tasks with status | |||
@@ -148,83 +81,25 @@ public class CeQueue { | |||
* | |||
* @return the number of canceled tasks | |||
*/ | |||
public int cancelAll() { | |||
return cancelAll(false); | |||
} | |||
int cancelAll(); | |||
private int cancelAll(boolean includeInProgress) { | |||
int count = 0; | |||
DbSession dbSession = dbClient.openSession(false); | |||
try { | |||
for (CeQueueDto queueDto : dbClient.ceQueueDao().selectAllInAscOrder(dbSession)) { | |||
if (includeInProgress || !queueDto.getStatus().equals(CeQueueDto.Status.IN_PROGRESS)) { | |||
cancel(dbSession, queueDto); | |||
count++; | |||
} | |||
} | |||
return count; | |||
} finally { | |||
dbClient.closeSession(dbSession); | |||
} | |||
} | |||
public void remove(CeTask task, CeActivityDto.Status status) { | |||
DbSession dbSession = dbClient.openSession(false); | |||
try { | |||
Optional<CeQueueDto> queueDto = dbClient.ceQueueDao().selectByUuid(dbSession, task.getUuid()); | |||
if (!queueDto.isPresent()) { | |||
throw new IllegalStateException(format("Task does not exist anymore: %s", task)); | |||
} | |||
CeActivityDto activityDto = new CeActivityDto(queueDto.get()); | |||
activityDto.setStatus(status); | |||
Long startedAt = activityDto.getStartedAt(); | |||
if (startedAt != null) { | |||
activityDto.setFinishedAt(system2.now()); | |||
long executionTime = activityDto.getFinishedAt() - startedAt; | |||
activityDto.setExecutionTimeMs(executionTime); | |||
if (status == CeActivityDto.Status.SUCCESS) { | |||
queueStatus.addSuccess(executionTime); | |||
} else { | |||
queueStatus.addError(executionTime); | |||
} | |||
} | |||
remove(dbSession, task, queueDto.get(), activityDto); | |||
} finally { | |||
dbClient.closeSession(dbSession); | |||
} | |||
} | |||
private void remove(DbSession dbSession, CeTask task, CeQueueDto queueDto, CeActivityDto activityDto) { | |||
dbClient.ceActivityDao().insert(dbSession, activityDto); | |||
dbClient.ceQueueDao().deleteByUuid(dbSession, queueDto.getUuid()); | |||
dbSession.commit(); | |||
for (CeQueueListener listener : listeners) { | |||
listener.onRemoved(task, activityDto.getStatus()); | |||
} | |||
} | |||
/** | |||
* Removes a task from the queue and registers it to past activities. This method | |||
* is called by Compute Engine workers when task is processed. | |||
* | |||
* <p>An unchecked exception is thrown if the task does not exist in the queue.</p> | |||
*/ | |||
void remove(CeTask task, CeActivityDto.Status status); | |||
public void pauseSubmit() { | |||
this.submitPaused = true; | |||
} | |||
void pauseSubmit(); | |||
public void resumeSubmit() { | |||
this.submitPaused = false; | |||
} | |||
void resumeSubmit(); | |||
public boolean isSubmitPaused() { | |||
return submitPaused; | |||
} | |||
boolean isSubmitPaused(); | |||
public void pausePeek() { | |||
this.peekPaused = true; | |||
} | |||
void pausePeek(); | |||
public void resumePeek() { | |||
this.peekPaused = false; | |||
} | |||
void resumePeek(); | |||
public boolean isPeekPaused() { | |||
return peekPaused; | |||
} | |||
boolean isPeekPaused(); | |||
} |
@@ -0,0 +1,91 @@ | |||
/* | |||
* SonarQube, open source software quality management tool. | |||
* Copyright (C) 2008-2014 SonarSource | |||
* mailto:contact AT sonarsource DOT com | |||
* | |||
* SonarQube is free software; you can redistribute it and/or | |||
* modify it under the terms of the GNU Lesser General Public | |||
* License as published by the Free Software Foundation; either | |||
* version 3 of the License, or (at your option) any later version. | |||
* | |||
* SonarQube is distributed in the hope that it will be useful, | |||
* but WITHOUT ANY WARRANTY; without even the implied warranty of | |||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |||
* Lesser General Public License for more details. | |||
* | |||
* You should have received a copy of the GNU Lesser General Public License | |||
* along with this program; if not, write to the Free Software Foundation, | |||
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. | |||
*/ | |||
package org.sonar.server.computation; | |||
import java.util.HashSet; | |||
import java.util.Set; | |||
import org.sonar.api.platform.ServerUpgradeStatus; | |||
import org.sonar.api.server.ServerSide; | |||
import org.sonar.api.utils.log.Logger; | |||
import org.sonar.api.utils.log.Loggers; | |||
import org.sonar.db.DbClient; | |||
import org.sonar.db.DbSession; | |||
import org.sonar.db.ce.CeQueueDto; | |||
import org.sonar.db.ce.CeTaskTypes; | |||
/** | |||
* Cleans-up the Compute Engine queue and resets the JMX counters. | |||
* CE workers must not be started before execution of this class. | |||
*/ | |||
@ServerSide | |||
public class CeQueueCleaner { | |||
private static final Logger LOGGER = Loggers.get(CeQueueCleaner.class); | |||
private final DbClient dbClient; | |||
private final ServerUpgradeStatus serverUpgradeStatus; | |||
private final ReportFiles reportFiles; | |||
private final CeQueueImpl queue; | |||
public CeQueueCleaner(DbClient dbClient, ServerUpgradeStatus serverUpgradeStatus, ReportFiles reportFiles, CeQueueImpl queue) { | |||
this.dbClient = dbClient; | |||
this.serverUpgradeStatus = serverUpgradeStatus; | |||
this.reportFiles = reportFiles; | |||
this.queue = queue; | |||
} | |||
public void clean(DbSession dbSession) { | |||
if (serverUpgradeStatus.isUpgraded()) { | |||
cleanOnUpgrade(); | |||
} else { | |||
verifyConsistency(dbSession); | |||
} | |||
} | |||
private void cleanOnUpgrade() { | |||
// we assume that pending tasks are not compatible with the new version | |||
// and can't be processed | |||
LOGGER.info("Cancel all pending tasks (due to upgrade)"); | |||
queue.clear(); | |||
} | |||
private void verifyConsistency(DbSession dbSession) { | |||
// server is not being upgraded | |||
dbClient.ceQueueDao().resetAllToPendingStatus(dbSession); | |||
dbSession.commit(); | |||
// verify that the report files are available for the tasks in queue | |||
Set<String> uuidsInQueue = new HashSet<>(); | |||
for (CeQueueDto queueDto : dbClient.ceQueueDao().selectAllInAscOrder(dbSession)) { | |||
uuidsInQueue.add(queueDto.getUuid()); | |||
if (CeTaskTypes.REPORT.equals(queueDto.getTaskType()) && !reportFiles.fileForUuid(queueDto.getUuid()).exists()) { | |||
// the report is not available on file system | |||
queue.cancel(dbSession, queueDto); | |||
} | |||
} | |||
// clean-up filesystem | |||
for (String uuid : reportFiles.listUuids()) { | |||
if (!uuidsInQueue.contains(uuid)) { | |||
reportFiles.deleteIfExists(uuid); | |||
} | |||
} | |||
} | |||
} |
@@ -0,0 +1,285 @@ | |||
/* | |||
* SonarQube, open source software quality management tool. | |||
* Copyright (C) 2008-2014 SonarSource | |||
* mailto:contact AT sonarsource DOT com | |||
* | |||
* SonarQube is free software; you can redistribute it and/or | |||
* modify it under the terms of the GNU Lesser General Public | |||
* License as published by the Free Software Foundation; either | |||
* version 3 of the License, or (at your option) any later version. | |||
* | |||
* SonarQube is distributed in the hope that it will be useful, | |||
* but WITHOUT ANY WARRANTY; without even the implied warranty of | |||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |||
* Lesser General Public License for more details. | |||
* | |||
* You should have received a copy of the GNU Lesser General Public License | |||
* along with this program; if not, write to the Free Software Foundation, | |||
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. | |||
*/ | |||
package org.sonar.server.computation; | |||
import com.google.common.base.Optional; | |||
import com.google.common.base.Strings; | |||
import java.util.concurrent.atomic.AtomicBoolean; | |||
import javax.annotation.Nullable; | |||
import org.sonar.api.server.ServerSide; | |||
import org.sonar.api.utils.System2; | |||
import org.sonar.core.util.UuidFactory; | |||
import org.sonar.db.DbClient; | |||
import org.sonar.db.DbSession; | |||
import org.sonar.db.ce.CeActivityDto; | |||
import org.sonar.db.ce.CeQueueDto; | |||
import org.sonar.server.computation.monitoring.CEQueueStatus; | |||
import static com.google.common.base.Preconditions.checkArgument; | |||
import static com.google.common.base.Preconditions.checkState; | |||
import static java.lang.String.format; | |||
@ServerSide | |||
public class CeQueueImpl implements CeQueue { | |||
private final System2 system2; | |||
private final DbClient dbClient; | |||
private final UuidFactory uuidFactory; | |||
private final CEQueueStatus queueStatus; | |||
private final CeQueueListener[] listeners; | |||
// state | |||
private AtomicBoolean submitPaused = new AtomicBoolean(false); | |||
private AtomicBoolean peekPaused = new AtomicBoolean(false); | |||
public CeQueueImpl(System2 system2, DbClient dbClient, UuidFactory uuidFactory, | |||
CEQueueStatus queueStatus, CeQueueListener[] listeners) { | |||
this.system2 = system2; | |||
this.dbClient = dbClient; | |||
this.uuidFactory = uuidFactory; | |||
this.queueStatus = queueStatus; | |||
this.listeners = listeners; | |||
} | |||
@Override | |||
public TaskSubmission prepareSubmit() { | |||
return new TaskSubmissionImpl(uuidFactory.create()); | |||
} | |||
@Override | |||
public CeTask submit(TaskSubmission submission) { | |||
checkArgument(!Strings.isNullOrEmpty(submission.getUuid())); | |||
checkArgument(!Strings.isNullOrEmpty(submission.getType())); | |||
checkArgument(submission instanceof TaskSubmissionImpl); | |||
checkState(!submitPaused.get(), "Compute Engine does not currently accept new tasks"); | |||
CeTask task = new CeTask(submission); | |||
DbSession dbSession = dbClient.openSession(false); | |||
try { | |||
CeQueueDto dto = createQueueDtoForSubmit(task); | |||
dbClient.ceQueueDao().insert(dbSession, dto); | |||
dbSession.commit(); | |||
queueStatus.addReceived(); | |||
return task; | |||
} finally { | |||
dbClient.closeSession(dbSession); | |||
} | |||
} | |||
private CeQueueDto createQueueDtoForSubmit(CeTask task) { | |||
CeQueueDto dto = new CeQueueDto(); | |||
dto.setUuid(task.getUuid()); | |||
dto.setTaskType(task.getType()); | |||
dto.setComponentUuid(task.getComponentUuid()); | |||
dto.setStatus(CeQueueDto.Status.PENDING); | |||
dto.setSubmitterLogin(task.getSubmitterLogin()); | |||
dto.setStartedAt(null); | |||
return dto; | |||
} | |||
@Override | |||
public Optional<CeTask> peek() { | |||
if (peekPaused.get()) { | |||
return Optional.absent(); | |||
} | |||
DbSession dbSession = dbClient.openSession(false); | |||
try { | |||
Optional<CeQueueDto> dto = dbClient.ceQueueDao().peek(dbSession); | |||
CeTask task = null; | |||
if (dto.isPresent()) { | |||
queueStatus.addInProgress(); | |||
task = new CeTask(dto.get()); | |||
} | |||
return Optional.fromNullable(task); | |||
} finally { | |||
dbClient.closeSession(dbSession); | |||
} | |||
} | |||
@Override | |||
public boolean cancel(String taskUuid) { | |||
DbSession dbSession = dbClient.openSession(false); | |||
try { | |||
Optional<CeQueueDto> queueDto = dbClient.ceQueueDao().selectByUuid(dbSession, taskUuid); | |||
if (queueDto.isPresent()) { | |||
checkState(CeQueueDto.Status.PENDING.equals(queueDto.get().getStatus()), "Task is in progress and can't be canceled [uuid=%s]", taskUuid); | |||
cancel(dbSession, queueDto.get()); | |||
return true; | |||
} | |||
return false; | |||
} finally { | |||
dbClient.closeSession(dbSession); | |||
} | |||
} | |||
void cancel(DbSession dbSession, CeQueueDto q) { | |||
CeActivityDto activityDto = new CeActivityDto(q); | |||
activityDto.setStatus(CeActivityDto.Status.CANCELED); | |||
remove(dbSession, new CeTask(q), q, activityDto); | |||
} | |||
@Override | |||
public int clear() { | |||
return cancelAll(true); | |||
} | |||
@Override | |||
public int cancelAll() { | |||
return cancelAll(false); | |||
} | |||
private int cancelAll(boolean includeInProgress) { | |||
int count = 0; | |||
DbSession dbSession = dbClient.openSession(false); | |||
try { | |||
for (CeQueueDto queueDto : dbClient.ceQueueDao().selectAllInAscOrder(dbSession)) { | |||
if (includeInProgress || !queueDto.getStatus().equals(CeQueueDto.Status.IN_PROGRESS)) { | |||
cancel(dbSession, queueDto); | |||
count++; | |||
} | |||
} | |||
return count; | |||
} finally { | |||
dbClient.closeSession(dbSession); | |||
} | |||
} | |||
@Override | |||
public void remove(CeTask task, CeActivityDto.Status status) { | |||
DbSession dbSession = dbClient.openSession(false); | |||
try { | |||
Optional<CeQueueDto> queueDto = dbClient.ceQueueDao().selectByUuid(dbSession, task.getUuid()); | |||
if (!queueDto.isPresent()) { | |||
throw new IllegalStateException(format("Task does not exist anymore: %s", task)); | |||
} | |||
CeActivityDto activityDto = new CeActivityDto(queueDto.get()); | |||
activityDto.setStatus(status); | |||
updateQueueStatus(status, activityDto); | |||
remove(dbSession, task, queueDto.get(), activityDto); | |||
} finally { | |||
dbClient.closeSession(dbSession); | |||
} | |||
} | |||
private void updateQueueStatus(CeActivityDto.Status status, CeActivityDto activityDto) { | |||
Long startedAt = activityDto.getStartedAt(); | |||
if (startedAt == null) { | |||
return; | |||
} | |||
activityDto.setFinishedAt(system2.now()); | |||
long executionTime = activityDto.getFinishedAt() - startedAt; | |||
activityDto.setExecutionTimeMs(executionTime); | |||
if (status == CeActivityDto.Status.SUCCESS) { | |||
queueStatus.addSuccess(executionTime); | |||
} else { | |||
queueStatus.addError(executionTime); | |||
} | |||
} | |||
private void remove(DbSession dbSession, CeTask task, CeQueueDto queueDto, CeActivityDto activityDto) { | |||
dbClient.ceActivityDao().insert(dbSession, activityDto); | |||
dbClient.ceQueueDao().deleteByUuid(dbSession, queueDto.getUuid()); | |||
dbSession.commit(); | |||
for (CeQueueListener listener : listeners) { | |||
listener.onRemoved(task, activityDto.getStatus()); | |||
} | |||
} | |||
@Override | |||
public void pauseSubmit() { | |||
this.submitPaused.set(true); | |||
} | |||
@Override | |||
public void resumeSubmit() { | |||
this.submitPaused.set(false); | |||
} | |||
@Override | |||
public boolean isSubmitPaused() { | |||
return submitPaused.get(); | |||
} | |||
@Override | |||
public void pausePeek() { | |||
this.peekPaused.set(true); | |||
} | |||
@Override | |||
public void resumePeek() { | |||
this.peekPaused.set(false); | |||
} | |||
@Override | |||
public boolean isPeekPaused() { | |||
return peekPaused.get(); | |||
} | |||
private static class TaskSubmissionImpl implements TaskSubmission { | |||
private final String uuid; | |||
private String type; | |||
private String componentUuid; | |||
private String submitterLogin; | |||
private TaskSubmissionImpl(String uuid) { | |||
this.uuid = uuid; | |||
} | |||
@Override | |||
public String getUuid() { | |||
return uuid; | |||
} | |||
@Override | |||
public String getType() { | |||
return type; | |||
} | |||
@Override | |||
public TaskSubmission setType(@Nullable String s) { | |||
this.type = s; | |||
return this; | |||
} | |||
@Override | |||
public String getComponentUuid() { | |||
return componentUuid; | |||
} | |||
@Override | |||
public TaskSubmission setComponentUuid(@Nullable String s) { | |||
this.componentUuid = s; | |||
return this; | |||
} | |||
@Override | |||
public String getSubmitterLogin() { | |||
return submitterLogin; | |||
} | |||
@Override | |||
public TaskSubmission setSubmitterLogin(@Nullable String s) { | |||
this.submitterLogin = s; | |||
return this; | |||
} | |||
} | |||
} |
@@ -19,89 +19,48 @@ | |||
*/ | |||
package org.sonar.server.computation; | |||
import java.util.HashSet; | |||
import java.util.Set; | |||
import org.sonar.api.platform.ServerUpgradeStatus; | |||
import org.picocontainer.Startable; | |||
import org.sonar.api.server.ServerSide; | |||
import org.sonar.api.utils.log.Logger; | |||
import org.sonar.api.utils.log.Loggers; | |||
import org.sonar.db.DbClient; | |||
import org.sonar.db.DbSession; | |||
import org.sonar.db.ce.CeQueueDto; | |||
import org.sonar.db.ce.CeTaskTypes; | |||
import org.sonar.server.computation.monitoring.CEQueueStatus; | |||
/** | |||
* Cleans-up the Compute Engine queue and resets the JMX counters. | |||
* CE workers must not be started before execution of this class. | |||
* Cleans-up the queue, initializes JMX counters then schedule | |||
* the execution of workers. That allows to not prevent workers | |||
* from peeking the queue before it's ready. | |||
*/ | |||
@ServerSide | |||
public class CeQueueInitializer { | |||
private static final Logger LOGGER = Loggers.get(CeQueueInitializer.class); | |||
public class CeQueueInitializer implements Startable { | |||
private final DbClient dbClient; | |||
private final ServerUpgradeStatus serverUpgradeStatus; | |||
private final ReportFiles reportFiles; | |||
private final CeQueue queue; | |||
private final CEQueueStatus queueStatus; | |||
private final CeQueueCleaner cleaner; | |||
private final ReportProcessingScheduler scheduler; | |||
public CeQueueInitializer(DbClient dbClient, ServerUpgradeStatus serverUpgradeStatus, ReportFiles reportFiles, | |||
CeQueue queue, CEQueueStatus queueStatus) { | |||
public CeQueueInitializer(DbClient dbClient, CEQueueStatus queueStatus, CeQueueCleaner cleaner, ReportProcessingScheduler scheduler) { | |||
this.dbClient = dbClient; | |||
this.serverUpgradeStatus = serverUpgradeStatus; | |||
this.reportFiles = reportFiles; | |||
this.queue = queue; | |||
this.queueStatus = queueStatus; | |||
this.cleaner = cleaner; | |||
this.scheduler = scheduler; | |||
} | |||
/** | |||
* Do not rename. Used at server startup. | |||
*/ | |||
@Override | |||
public void start() { | |||
DbSession dbSession = dbClient.openSession(false); | |||
try { | |||
initJmxCounters(dbSession); | |||
if (serverUpgradeStatus.isUpgraded()) { | |||
cleanOnUpgrade(); | |||
} else { | |||
verifyConsistency(dbSession); | |||
} | |||
cleaner.clean(dbSession); | |||
scheduler.schedule(); | |||
} finally { | |||
dbClient.closeSession(dbSession); | |||
} | |||
} | |||
private void cleanOnUpgrade() { | |||
// we assume that pending tasks are not compatible with the new version | |||
// and can't be processed | |||
LOGGER.info("Cancel all pending tasks (due to upgrade)"); | |||
queue.clear(); | |||
} | |||
private void verifyConsistency(DbSession dbSession) { | |||
// server is not being upgraded | |||
dbClient.ceQueueDao().resetAllToPendingStatus(dbSession); | |||
dbSession.commit(); | |||
// verify that the report files are available for the tasks in queue | |||
Set<String> uuidsInQueue = new HashSet<>(); | |||
for (CeQueueDto queueDto : dbClient.ceQueueDao().selectAllInAscOrder(dbSession)) { | |||
uuidsInQueue.add(queueDto.getUuid()); | |||
if (CeTaskTypes.REPORT.equals(queueDto.getTaskType()) && !reportFiles.fileForUuid(queueDto.getUuid()).exists()) { | |||
// the report is not available on file system | |||
queue.cancel(dbSession, queueDto); | |||
} | |||
} | |||
// clean-up filesystem | |||
for (String uuid : reportFiles.listUuids()) { | |||
if (!uuidsInQueue.contains(uuid)) { | |||
reportFiles.deleteIfExists(uuid); | |||
} | |||
} | |||
@Override | |||
public void stop() { | |||
// nothing to do | |||
} | |||
private void initJmxCounters(DbSession dbSession) { |
@@ -25,6 +25,8 @@ import javax.annotation.Nullable; | |||
import javax.annotation.concurrent.Immutable; | |||
import org.sonar.db.ce.CeQueueDto; | |||
import static java.util.Objects.requireNonNull; | |||
@Immutable | |||
public class CeTask { | |||
@@ -34,24 +36,18 @@ public class CeTask { | |||
private final String submitterLogin; | |||
public CeTask(String uuid, String type, @Nullable String componentUuid, @Nullable String submitterLogin) { | |||
this.uuid = uuid; | |||
this.type = type; | |||
this.uuid = requireNonNull(uuid); | |||
this.type = requireNonNull(type); | |||
this.componentUuid = componentUuid; | |||
this.submitterLogin = submitterLogin; | |||
} | |||
CeTask(CeTaskSubmit submit) { | |||
this.uuid = submit.getUuid(); | |||
this.type = submit.getType(); | |||
this.componentUuid = submit.getComponentUuid(); | |||
this.submitterLogin = submit.getSubmitterLogin(); | |||
CeTask(TaskSubmission submit) { | |||
this(submit.getUuid(), submit.getType(), submit.getComponentUuid(), submit.getSubmitterLogin()); | |||
} | |||
CeTask(CeQueueDto dto) { | |||
this.uuid = dto.getUuid(); | |||
this.type = dto.getTaskType(); | |||
this.componentUuid = dto.getComponentUuid(); | |||
this.submitterLogin = dto.getSubmitterLogin(); | |||
this(dto.getUuid(), dto.getTaskType(), dto.getComponentUuid(), dto.getSubmitterLogin()); | |||
} | |||
public String getUuid() { | |||
@@ -83,7 +79,7 @@ public class CeTask { | |||
} | |||
@Override | |||
public boolean equals(Object o) { | |||
public boolean equals(@Nullable Object o) { | |||
if (this == o) { | |||
return true; | |||
} |
@@ -45,7 +45,7 @@ public class ReportFiles { | |||
this.settings = settings; | |||
} | |||
public void save(CeTaskSubmit taskSubmit, InputStream reportInput) { | |||
public void save(TaskSubmission taskSubmit, InputStream reportInput) { | |||
File file = fileForUuid(taskSubmit.getUuid()); | |||
try { | |||
FileUtils.copyInputStreamToFile(reportInput, file); |
@@ -21,13 +21,11 @@ | |||
package org.sonar.server.computation; | |||
import java.util.concurrent.TimeUnit; | |||
import org.sonar.api.platform.Server; | |||
import org.sonar.api.platform.ServerStartHandler; | |||
/** | |||
* Adds tasks to the Compute Engine to process batch reports. | |||
*/ | |||
public class ReportProcessingScheduler implements ServerStartHandler { | |||
public class ReportProcessingScheduler { | |||
private final ReportProcessingSchedulerExecutorService reportProcessingSchedulerExecutorService; | |||
private final ComputeEngineProcessingQueue processingQueue; | |||
private final CeWorker worker; | |||
@@ -43,17 +41,12 @@ public class ReportProcessingScheduler implements ServerStartHandler { | |||
this.processingQueue = processingQueue; | |||
this.worker = worker; | |||
this.delayBetweenTasks = 10; | |||
this.delayBetweenTasks = 1; | |||
this.delayForFirstStart = 0; | |||
this.timeUnit = TimeUnit.SECONDS; | |||
} | |||
public void startAnalysisTaskNow() { | |||
reportProcessingSchedulerExecutorService.execute(new AddReportProcessingToCEProcessingQueue()); | |||
} | |||
@Override | |||
public void onServerStart(Server server) { | |||
public void schedule() { | |||
reportProcessingSchedulerExecutorService.scheduleAtFixedRate(new AddReportProcessingToCEProcessingQueue(), delayForFirstStart, delayBetweenTasks, timeUnit); | |||
} | |||
@@ -22,7 +22,7 @@ package org.sonar.server.computation; | |||
import org.sonar.server.util.StoppableScheduledExecutorService; | |||
/** | |||
* ExecutorService responsible for adding {@link CeWorkerImpl} to {@link CeQueue} on a regular basis. | |||
* ExecutorService responsible for adding {@link CeWorkerImpl} to {@link CeQueueImpl} on a regular basis. | |||
*/ | |||
public interface ReportProcessingSchedulerExecutorService extends StoppableScheduledExecutorService { | |||
} |
@@ -63,7 +63,7 @@ public class ReportSubmitter { | |||
} | |||
// the report file must be saved before submitting the task | |||
CeTaskSubmit submit = queue.prepareSubmit(); | |||
TaskSubmission submit = queue.prepareSubmit(); | |||
reportFiles.save(submit, reportInput); | |||
submit.setType(CeTaskTypes.REPORT); |
@@ -0,0 +1,43 @@ | |||
/* | |||
* SonarQube, open source software quality management tool. | |||
* Copyright (C) 2008-2014 SonarSource | |||
* mailto:contact AT sonarsource DOT com | |||
* | |||
* SonarQube is free software; you can redistribute it and/or | |||
* modify it under the terms of the GNU Lesser General Public | |||
* License as published by the Free Software Foundation; either | |||
* version 3 of the License, or (at your option) any later version. | |||
* | |||
* SonarQube is distributed in the hope that it will be useful, | |||
* but WITHOUT ANY WARRANTY; without even the implied warranty of | |||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |||
* Lesser General Public License for more details. | |||
* | |||
* You should have received a copy of the GNU Lesser General Public License | |||
* along with this program; if not, write to the Free Software Foundation, | |||
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. | |||
*/ | |||
package org.sonar.server.computation; | |||
import javax.annotation.CheckForNull; | |||
import javax.annotation.Nullable; | |||
public interface TaskSubmission { | |||
String getUuid(); | |||
String getType(); | |||
TaskSubmission setType(String s); | |||
@CheckForNull | |||
String getComponentUuid(); | |||
TaskSubmission setComponentUuid(@Nullable String s); | |||
@CheckForNull | |||
String getSubmitterLogin(); | |||
TaskSubmission setSubmitterLogin(@Nullable String s); | |||
} |
@@ -25,10 +25,12 @@ import org.sonar.api.server.ws.Request; | |||
import org.sonar.api.server.ws.Response; | |||
import org.sonar.api.server.ws.WebService; | |||
import org.sonar.api.web.UserRole; | |||
import org.sonar.core.util.Uuids; | |||
import org.sonar.db.DbClient; | |||
import org.sonar.db.DbSession; | |||
import org.sonar.db.ce.CeActivityDto; | |||
import org.sonar.db.ce.CeActivityQuery; | |||
import org.sonar.db.ce.CeTaskTypes; | |||
import org.sonar.server.user.UserSession; | |||
import org.sonar.server.ws.WsUtils; | |||
import org.sonarqube.ws.Common; | |||
@@ -40,6 +42,11 @@ import org.sonarqube.ws.WsCe; | |||
*/ | |||
public class CeActivityWsAction implements CeWsAction { | |||
private static final String PARAM_COMPONENT_ID = "componentId"; | |||
private static final String PARAM_TYPE = "type"; | |||
private static final String PARAM_STATUS = "status"; | |||
private static final String PARAM_ONLY_CURRENTS = "onlyCurrents"; | |||
private final UserSession userSession; | |||
private final DbClient dbClient; | |||
private final CeWsTaskFormatter formatter; | |||
@@ -52,32 +59,71 @@ public class CeActivityWsAction implements CeWsAction { | |||
@Override | |||
public void define(WebService.NewController controller) { | |||
controller.createAction("activity") | |||
WebService.NewAction action = controller.createAction("activity") | |||
.setInternal(true) | |||
.setResponseExample(getClass().getResource("CeActivityWsAction/example.json")) | |||
.setHandler(this); | |||
action.createParam(PARAM_COMPONENT_ID) | |||
.setDescription("Optional id of the component (project) to filter on") | |||
.setExampleValue(Uuids.UUID_EXAMPLE_03); | |||
action.createParam(PARAM_STATUS) | |||
.setDescription("Optional filter on task status") | |||
.setPossibleValues(CeActivityDto.Status.values()); | |||
action.createParam(PARAM_ONLY_CURRENTS) | |||
.setDescription("Optional filter on the current activities (only the most recent task by project)") | |||
.setBooleanPossibleValues() | |||
.setDefaultValue("false"); | |||
action.createParam(PARAM_TYPE) | |||
.setDescription("Optional filter on task type") | |||
.setExampleValue(CeTaskTypes.REPORT); | |||
action.addPagingParams(10); | |||
} | |||
@Override | |||
public void handle(Request wsRequest, Response wsResponse) throws Exception { | |||
userSession.checkGlobalPermission(UserRole.ADMIN); | |||
DbSession dbSession = dbClient.openSession(false); | |||
try { | |||
CeActivityQuery query = new CeActivityQuery(); | |||
List<CeActivityDto> dtos = dbClient.ceActivityDao().selectByQuery(dbSession, query, new RowBounds(0, 100)); | |||
CeActivityQuery query = readQuery(wsRequest); | |||
RowBounds rowBounds = readMyBatisRowBounds(wsRequest); | |||
List<CeActivityDto> dtos = dbClient.ceActivityDao().selectByQuery(dbSession, query, rowBounds); | |||
int total = dbClient.ceActivityDao().countByQuery(dbSession, query); | |||
WsCe.ActivityResponse.Builder wsResponseBuilder = WsCe.ActivityResponse.newBuilder(); | |||
wsResponseBuilder.addAllTasks(formatter.formatActivity(dbSession, dtos)); | |||
Common.Paging paging = Common.Paging.newBuilder() | |||
.setPageIndex(1) | |||
.setPageSize(dtos.size()) | |||
.setTotal(dtos.size()) | |||
.build(); | |||
wsResponseBuilder.setPaging(paging); | |||
wsResponseBuilder.setPaging(Common.Paging.newBuilder() | |||
.setPageIndex(wsRequest.mandatoryParamAsInt(WebService.Param.PAGE)) | |||
.setPageSize(wsRequest.mandatoryParamAsInt(WebService.Param.PAGE_SIZE)) | |||
.setTotal(total)); | |||
WsUtils.writeProtobuf(wsResponseBuilder.build(), wsRequest, wsResponse); | |||
} finally { | |||
dbClient.closeSession(dbSession); | |||
} | |||
} | |||
private CeActivityQuery readQuery(Request wsRequest) { | |||
CeActivityQuery query = new CeActivityQuery(); | |||
query.setType(wsRequest.param(PARAM_TYPE)); | |||
query.setOnlyCurrents(wsRequest.mandatoryParamAsBoolean(PARAM_ONLY_CURRENTS)); | |||
String status = wsRequest.param(PARAM_STATUS); | |||
if (status != null) { | |||
query.setStatus(CeActivityDto.Status.valueOf(status)); | |||
} | |||
String componentId = wsRequest.param(PARAM_COMPONENT_ID); | |||
if (componentId == null) { | |||
userSession.checkGlobalPermission(UserRole.ADMIN); | |||
} else { | |||
userSession.checkProjectUuidPermission(UserRole.ADMIN, componentId); | |||
query.setComponentUuid(componentId); | |||
} | |||
return query; | |||
} | |||
private static RowBounds readMyBatisRowBounds(Request wsRequest) { | |||
int pageIndex = wsRequest.mandatoryParamAsInt(WebService.Param.PAGE); | |||
int pageSize = wsRequest.mandatoryParamAsInt(WebService.Param.PAGE_SIZE); | |||
return new RowBounds((pageIndex - 1) * pageSize, pageSize); | |||
} | |||
} |
@@ -19,14 +19,12 @@ | |||
*/ | |||
package org.sonar.server.computation.ws; | |||
import com.google.common.base.Strings; | |||
import java.io.InputStream; | |||
import org.apache.commons.lang.StringUtils; | |||
import org.sonar.api.server.ws.Request; | |||
import org.sonar.api.server.ws.Response; | |||
import org.sonar.api.server.ws.WebService; | |||
import org.sonar.server.computation.CeTask; | |||
import org.sonar.server.computation.ReportProcessingScheduler; | |||
import org.sonar.server.computation.ReportSubmitter; | |||
import org.sonar.server.ws.WsUtils; | |||
import org.sonarqube.ws.WsCe; | |||
@@ -43,11 +41,9 @@ public class CeSubmitWsAction implements CeWsAction { | |||
public static final String PARAM_REPORT_DATA = "report"; | |||
private final ReportSubmitter reportSubmitter; | |||
private final ReportProcessingScheduler reportProcessingScheduler; | |||
public CeSubmitWsAction(ReportSubmitter reportSubmitter, ReportProcessingScheduler reportProcessingScheduler) { | |||
public CeSubmitWsAction(ReportSubmitter reportSubmitter) { | |||
this.reportSubmitter = reportSubmitter; | |||
this.reportProcessingScheduler = reportProcessingScheduler; | |||
} | |||
@Override | |||
@@ -85,12 +81,11 @@ public class CeSubmitWsAction implements CeWsAction { | |||
@Override | |||
public void handle(Request wsRequest, Response wsResponse) throws Exception { | |||
String projectKey = wsRequest.mandatoryParam(PARAM_PROJECT_KEY); | |||
String projectBranch = Strings.emptyToNull(wsRequest.param(PARAM_PROJECT_BRANCH)); | |||
String projectBranch = wsRequest.param(PARAM_PROJECT_BRANCH); | |||
String projectName = StringUtils.defaultIfBlank(wsRequest.param(PARAM_PROJECT_NAME), projectKey); | |||
InputStream reportInput = wsRequest.paramAsInputStream(PARAM_REPORT_DATA); | |||
CeTask task = reportSubmitter.submit(projectKey, projectBranch, projectName, reportInput); | |||
reportProcessingScheduler.startAnalysisTaskNow(); | |||
WsCe.SubmitResponse submitResponse = WsCe.SubmitResponse.newBuilder() | |||
.setTaskId(task.getUuid()) |
@@ -64,7 +64,8 @@ import org.sonar.server.component.DefaultRubyComponentService; | |||
import org.sonar.server.component.ws.ComponentsWs; | |||
import org.sonar.server.component.ws.EventsWs; | |||
import org.sonar.server.component.ws.ResourcesWs; | |||
import org.sonar.server.computation.CeQueue; | |||
import org.sonar.server.computation.CeQueueImpl; | |||
import org.sonar.server.computation.CeQueueCleaner; | |||
import org.sonar.server.computation.CeQueueInitializer; | |||
import org.sonar.server.computation.CleanReportQueueListener; | |||
import org.sonar.server.computation.ComputeEngineProcessingModule; | |||
@@ -718,7 +719,7 @@ public class PlatformLevel4 extends PlatformLevel { | |||
// Compute engine | |||
CEQueueStatusImpl.class, | |||
ComputeEngineQueueMonitor.class, | |||
CeQueue.class, | |||
CeQueueImpl.class, | |||
CleanReportQueueListener.class, | |||
ReportFiles.class, | |||
ComputeEngineProcessingModule.class, | |||
@@ -734,6 +735,7 @@ public class PlatformLevel4 extends PlatformLevel { | |||
ProjectSettingsFactory.class, | |||
IndexPurgeListener.class, | |||
ReportSubmitter.class, | |||
CeQueueCleaner.class, | |||
CeQueueInitializer.class, | |||
// Views plugin | |||
ViewsBootstrap.class, |
@@ -52,7 +52,7 @@ public abstract class AbstractStoppableExecutorService<T extends ExecutorService | |||
if (!delegate.awaitTermination(5, TimeUnit.SECONDS)) { | |||
// Cancel currently executing tasks | |||
delegate.shutdownNow(); | |||
// Wait a while for tasks to respond to being cancelled | |||
// Wait a while for tasks to respond to being canceled | |||
if (!delegate.awaitTermination(5, TimeUnit.SECONDS)) { | |||
Loggers.get(getClass()).error(format("Pool %s did not terminate", getClass().getSimpleName())); | |||
} |
@@ -1,4 +1,9 @@ | |||
{ | |||
"paging": { | |||
"pageIndex": 1, | |||
"pageSize": 10, | |||
"total": 233 | |||
}, | |||
"tasks": [ | |||
{ | |||
"id": "BU_dO1vsORa8_beWCwsP", |
@@ -0,0 +1,116 @@ | |||
/* | |||
* SonarQube, open source software quality management tool. | |||
* Copyright (C) 2008-2014 SonarSource | |||
* mailto:contact AT sonarsource DOT com | |||
* | |||
* SonarQube is free software; you can redistribute it and/or | |||
* modify it under the terms of the GNU Lesser General Public | |||
* License as published by the Free Software Foundation; either | |||
* version 3 of the License, or (at your option) any later version. | |||
* | |||
* SonarQube is distributed in the hope that it will be useful, | |||
* but WITHOUT ANY WARRANTY; without even the implied warranty of | |||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |||
* Lesser General Public License for more details. | |||
* | |||
* You should have received a copy of the GNU Lesser General Public License | |||
* along with this program; if not, write to the Free Software Foundation, | |||
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. | |||
*/ | |||
package org.sonar.server.computation; | |||
import java.io.File; | |||
import java.io.IOException; | |||
import org.junit.Rule; | |||
import org.junit.Test; | |||
import org.junit.rules.TemporaryFolder; | |||
import org.mockito.Mockito; | |||
import org.sonar.api.platform.ServerUpgradeStatus; | |||
import org.sonar.api.utils.System2; | |||
import org.sonar.db.DbSession; | |||
import org.sonar.db.DbTester; | |||
import org.sonar.db.ce.CeQueueDto; | |||
import org.sonar.db.ce.CeTaskTypes; | |||
import static java.util.Arrays.asList; | |||
import static org.assertj.core.api.Assertions.assertThat; | |||
import static org.mockito.Matchers.any; | |||
import static org.mockito.Matchers.eq; | |||
import static org.mockito.Mockito.mock; | |||
import static org.mockito.Mockito.verify; | |||
import static org.mockito.Mockito.when; | |||
public class CeQueueCleanerTest { | |||
@Rule | |||
public DbTester dbTester = DbTester.create(System2.INSTANCE); | |||
@Rule | |||
public TemporaryFolder tempFolder = new TemporaryFolder(); | |||
ServerUpgradeStatus serverUpgradeStatus = mock(ServerUpgradeStatus.class); | |||
ReportFiles reportFiles = mock(ReportFiles.class, Mockito.RETURNS_DEEP_STUBS); | |||
CeQueueImpl queue = mock(CeQueueImpl.class); | |||
CeQueueCleaner underTest = new CeQueueCleaner(dbTester.getDbClient(), serverUpgradeStatus, reportFiles, queue); | |||
@Test | |||
public void reset_in_progress_tasks_to_pending() throws IOException { | |||
insertInQueue("TASK_1", CeQueueDto.Status.PENDING); | |||
insertInQueue("TASK_2", CeQueueDto.Status.IN_PROGRESS); | |||
underTest.clean(dbTester.getSession()); | |||
assertThat(dbTester.getDbClient().ceQueueDao().countByStatus(dbTester.getSession(), CeQueueDto.Status.PENDING)).isEqualTo(2); | |||
assertThat(dbTester.getDbClient().ceQueueDao().countByStatus(dbTester.getSession(), CeQueueDto.Status.IN_PROGRESS)).isEqualTo(0); | |||
} | |||
@Test | |||
public void clear_queue_if_version_upgrade() { | |||
when(serverUpgradeStatus.isUpgraded()).thenReturn(true); | |||
underTest.clean(dbTester.getSession()); | |||
verify(queue).clear(); | |||
} | |||
@Test | |||
public void cancel_task_if_report_file_is_missing() throws IOException { | |||
CeQueueDto task = insertInQueue("TASK_1", CeQueueDto.Status.PENDING, false); | |||
underTest.clean(dbTester.getSession()); | |||
verify(queue).cancel(any(DbSession.class), eq(task)); | |||
} | |||
@Test | |||
public void delete_orphan_report_files() throws Exception { | |||
// two files on disk but on task in queue | |||
insertInQueue("TASK_1", CeQueueDto.Status.PENDING, true); | |||
when(reportFiles.listUuids()).thenReturn(asList("TASK_1", "TASK_2")); | |||
underTest.clean(dbTester.getSession()); | |||
verify(reportFiles).deleteIfExists("TASK_2"); | |||
} | |||
private void insertInQueue(String taskUuid, CeQueueDto.Status status) throws IOException { | |||
insertInQueue(taskUuid, status, true); | |||
} | |||
private CeQueueDto insertInQueue(String taskUuid, CeQueueDto.Status status, boolean createFile) throws IOException { | |||
CeQueueDto queueDto = new CeQueueDto(); | |||
queueDto.setTaskType(CeTaskTypes.REPORT); | |||
queueDto.setComponentUuid("PROJECT_1"); | |||
queueDto.setUuid(taskUuid); | |||
queueDto.setStatus(status); | |||
dbTester.getDbClient().ceQueueDao().insert(dbTester.getSession(), queueDto); | |||
dbTester.getSession().commit(); | |||
File file = tempFolder.newFile(); | |||
when(reportFiles.fileForUuid(taskUuid)).thenReturn(file); | |||
if (!createFile) { | |||
file.delete(); | |||
} | |||
return queueDto; | |||
} | |||
} |
@@ -43,7 +43,7 @@ import static org.mockito.Mockito.never; | |||
import static org.mockito.Mockito.verify; | |||
import static org.mockito.Mockito.verifyZeroInteractions; | |||
public class CeQueueTest { | |||
public class CeQueueImplTest { | |||
@Rule | |||
public ExpectedException expectedException = ExpectedException.none(); | |||
@@ -56,21 +56,21 @@ public class CeQueueTest { | |||
UuidFactory uuidFactory = UuidFactoryImpl.INSTANCE; | |||
CEQueueStatus queueStatus = new CEQueueStatusImpl(); | |||
CeQueueListener listener = mock(CeQueueListener.class); | |||
CeQueue underTest = new CeQueue(system2, dbTester.getDbClient(), uuidFactory, queueStatus, new CeQueueListener[] {listener}); | |||
CeQueue underTest = new CeQueueImpl(system2, dbTester.getDbClient(), uuidFactory, queueStatus, new CeQueueListener[] {listener}); | |||
@Test | |||
public void test_submit() { | |||
CeTaskSubmit submit = underTest.prepareSubmit(); | |||
submit.setComponentUuid("PROJECT_1"); | |||
submit.setType(CeTaskTypes.REPORT); | |||
submit.setSubmitterLogin("rob"); | |||
TaskSubmission submission = underTest.prepareSubmit(); | |||
submission.setComponentUuid("PROJECT_1"); | |||
submission.setType(CeTaskTypes.REPORT); | |||
submission.setSubmitterLogin("rob"); | |||
CeTask task = underTest.submit(submit); | |||
assertThat(task.getUuid()).isEqualTo(submit.getUuid()); | |||
CeTask task = underTest.submit(submission); | |||
assertThat(task.getUuid()).isEqualTo(submission.getUuid()); | |||
assertThat(task.getComponentUuid()).isEqualTo("PROJECT_1"); | |||
assertThat(task.getSubmitterLogin()).isEqualTo("rob"); | |||
Optional<CeQueueDto> queueDto = dbTester.getDbClient().ceQueueDao().selectByUuid(dbTester.getSession(), submit.getUuid()); | |||
Optional<CeQueueDto> queueDto = dbTester.getDbClient().ceQueueDao().selectByUuid(dbTester.getSession(), submission.getUuid()); | |||
assertThat(queueDto.isPresent()).isTrue(); | |||
assertThat(queueDto.get().getTaskType()).isEqualTo(CeTaskTypes.REPORT); | |||
assertThat(queueDto.get().getComponentUuid()).isEqualTo("PROJECT_1"); | |||
@@ -163,7 +163,7 @@ public class CeQueueTest { | |||
@Test | |||
public void fail_to_cancel_if_in_progress() throws Exception { | |||
expectedException.expect(IllegalStateException.class); | |||
expectedException.expectMessage(startsWith("Task is in progress and can't be cancelled")); | |||
expectedException.expectMessage(startsWith("Task is in progress and can't be canceled")); | |||
CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1"); | |||
underTest.peek(); | |||
@@ -211,9 +211,9 @@ public class CeQueueTest { | |||
} | |||
private CeTask submit(String reportType, String componentUuid) { | |||
CeTaskSubmit submit = underTest.prepareSubmit(); | |||
submit.setType(reportType); | |||
submit.setComponentUuid(componentUuid); | |||
return underTest.submit(submit); | |||
TaskSubmission submission = underTest.prepareSubmit(); | |||
submission.setType(reportType); | |||
submission.setComponentUuid(componentUuid); | |||
return underTest.submit(submission); | |||
} | |||
} |
@@ -24,8 +24,8 @@ import java.io.IOException; | |||
import org.junit.Rule; | |||
import org.junit.Test; | |||
import org.junit.rules.TemporaryFolder; | |||
import org.mockito.InOrder; | |||
import org.mockito.Mockito; | |||
import org.sonar.api.platform.ServerUpgradeStatus; | |||
import org.sonar.api.utils.System2; | |||
import org.sonar.db.DbSession; | |||
import org.sonar.db.DbTester; | |||
@@ -34,12 +34,9 @@ import org.sonar.db.ce.CeTaskTypes; | |||
import org.sonar.server.computation.monitoring.CEQueueStatus; | |||
import org.sonar.server.computation.monitoring.CEQueueStatusImpl; | |||
import static java.util.Arrays.asList; | |||
import static org.assertj.core.api.Assertions.assertThat; | |||
import static org.mockito.Matchers.any; | |||
import static org.mockito.Matchers.eq; | |||
import static org.mockito.Mockito.mock; | |||
import static org.mockito.Mockito.verify; | |||
import static org.mockito.Mockito.when; | |||
public class CeQueueInitializerTest { | |||
@@ -50,11 +47,11 @@ public class CeQueueInitializerTest { | |||
@Rule | |||
public TemporaryFolder tempFolder = new TemporaryFolder(); | |||
ServerUpgradeStatus serverUpgradeStatus = mock(ServerUpgradeStatus.class); | |||
ReportFiles reportFiles = mock(ReportFiles.class, Mockito.RETURNS_DEEP_STUBS); | |||
CeQueue queue = mock(CeQueue.class); | |||
CEQueueStatus queueStatus = new CEQueueStatusImpl(); | |||
CeQueueInitializer underTest = new CeQueueInitializer(dbTester.getDbClient(), serverUpgradeStatus, reportFiles, queue, queueStatus); | |||
CeQueueCleaner cleaner = mock(CeQueueCleaner.class); | |||
ReportProcessingScheduler scheduler = mock(ReportProcessingScheduler.class); | |||
CeQueueInitializer underTest = new CeQueueInitializer(dbTester.getDbClient(), queueStatus, cleaner, scheduler); | |||
@Test | |||
public void init_jmx_counters() throws IOException { | |||
@@ -76,43 +73,13 @@ public class CeQueueInitializerTest { | |||
} | |||
@Test | |||
public void reset_in_progress_tasks_to_pending() throws IOException { | |||
insertInQueue("TASK_1", CeQueueDto.Status.PENDING); | |||
insertInQueue("TASK_2", CeQueueDto.Status.IN_PROGRESS); | |||
underTest.start(); | |||
assertThat(dbTester.getDbClient().ceQueueDao().countByStatus(dbTester.getSession(), CeQueueDto.Status.PENDING)).isEqualTo(2); | |||
assertThat(dbTester.getDbClient().ceQueueDao().countByStatus(dbTester.getSession(), CeQueueDto.Status.IN_PROGRESS)).isEqualTo(0); | |||
} | |||
@Test | |||
public void clear_queue_if_version_upgrade() { | |||
when(serverUpgradeStatus.isUpgraded()).thenReturn(true); | |||
underTest.start(); | |||
verify(queue).clear(); | |||
} | |||
@Test | |||
public void cancel_task_if_report_file_is_missing() throws IOException { | |||
CeQueueDto task = insertInQueue("TASK_1", CeQueueDto.Status.PENDING, false); | |||
underTest.start(); | |||
verify(queue).cancel(any(DbSession.class), eq(task)); | |||
} | |||
@Test | |||
public void delete_orphan_report_files() throws Exception { | |||
// two files on disk but on task in queue | |||
insertInQueue("TASK_1", CeQueueDto.Status.PENDING, true); | |||
when(reportFiles.listUuids()).thenReturn(asList("TASK_1", "TASK_2")); | |||
public void clean_queue_then_start_scheduler_of_workers() throws IOException { | |||
InOrder inOrder = Mockito.inOrder(cleaner, scheduler); | |||
underTest.start(); | |||
verify(reportFiles).deleteIfExists("TASK_2"); | |||
inOrder.verify(cleaner).clean(any(DbSession.class)); | |||
inOrder.verify(scheduler).schedule(); | |||
} | |||
private void insertInQueue(String taskUuid, CeQueueDto.Status status) throws IOException { |
@@ -26,15 +26,13 @@ import org.sonar.db.ce.CeTaskTypes; | |||
import static org.mockito.Mockito.doThrow; | |||
import static org.mockito.Mockito.mock; | |||
import static org.mockito.Mockito.never; | |||
import static org.mockito.Mockito.verify; | |||
import static org.mockito.Mockito.verifyNoMoreInteractions; | |||
import static org.mockito.Mockito.verifyZeroInteractions; | |||
import static org.mockito.Mockito.when; | |||
public class CeWorkerImplTest { | |||
CeQueue queue = mock(CeQueue.class); | |||
CeQueue queue = mock(CeQueueImpl.class); | |||
ReportTaskProcessor taskProcessor = mock(ReportTaskProcessor.class); | |||
CeWorker underTest = new CeWorkerImpl(queue, taskProcessor); | |||
@@ -26,12 +26,10 @@ import java.util.concurrent.TimeUnit; | |||
import org.junit.Test; | |||
import org.mockito.invocation.InvocationOnMock; | |||
import org.mockito.stubbing.Answer; | |||
import org.sonar.api.platform.Server; | |||
import static org.assertj.core.api.Assertions.assertThat; | |||
import static org.mockito.Matchers.any; | |||
import static org.mockito.Matchers.eq; | |||
import static org.mockito.Mockito.doAnswer; | |||
import static org.mockito.Mockito.mock; | |||
import static org.mockito.Mockito.when; | |||
@@ -44,20 +42,10 @@ public class ReportProcessingSchedulerTest { | |||
@Test | |||
public void schedule_at_fixed_rate_adding_a_ReportProcessingTask_to_the_queue() throws Exception { | |||
when(batchExecutorService.scheduleAtFixedRate(any(Runnable.class), eq(0L), eq(10L), eq(TimeUnit.SECONDS))) | |||
when(batchExecutorService.scheduleAtFixedRate(any(Runnable.class), eq(0L), eq(1L), eq(TimeUnit.SECONDS))) | |||
.thenAnswer(new ExecuteFirstArgAsRunnable()); | |||
underTest.onServerStart(mock(Server.class)); | |||
assertThat(processingQueue.getTasks()).hasSize(1); | |||
assertThat(processingQueue.getTasks().iterator().next()).isInstanceOf(CeWorker.class); | |||
} | |||
@Test | |||
public void adds_immediately_a_ReportProcessingTask_to_the_queue() throws Exception { | |||
doAnswer(new ExecuteFirstArgAsRunnable()).when(batchExecutorService).execute(any(Runnable.class)); | |||
underTest.startAnalysisTaskNow(); | |||
underTest.schedule(); | |||
assertThat(processingQueue.getTasks()).hasSize(1); | |||
assertThat(processingQueue.getTasks().iterator().next()).isInstanceOf(CeWorker.class); |
@@ -42,22 +42,22 @@ public class ReportSubmitterTest { | |||
@Rule | |||
public UserSessionRule userSession = UserSessionRule.standalone(); | |||
CeQueue queue = mock(CeQueue.class); | |||
CeQueue queue = mock(CeQueueImpl.class); | |||
ReportFiles reportFiles = mock(ReportFiles.class); | |||
ComponentService componentService = mock(ComponentService.class); | |||
ReportSubmitter underTest = new ReportSubmitter(queue, userSession, reportFiles, componentService); | |||
@Test | |||
public void submit_a_report_on_existing_project() { | |||
when(queue.prepareSubmit()).thenReturn(new CeTaskSubmit("TASK_1")); | |||
when(queue.prepareSubmit()).thenReturn(new TestTaskSubmission("TASK_1")); | |||
userSession.setGlobalPermissions(GlobalPermissions.SCAN_EXECUTION); | |||
when(componentService.getNullableByKey("MY_PROJECT")).thenReturn(new ComponentDto().setUuid("P1")); | |||
underTest.submit("MY_PROJECT", null, "My Project", IOUtils.toInputStream("{binary}")); | |||
verify(queue).submit(argThat(new TypeSafeMatcher<CeTaskSubmit>() { | |||
verify(queue).submit(argThat(new TypeSafeMatcher<TaskSubmission>() { | |||
@Override | |||
protected boolean matchesSafely(CeTaskSubmit submit) { | |||
protected boolean matchesSafely(TaskSubmission submit) { | |||
return submit.getType().equals(CeTaskTypes.REPORT) && submit.getComponentUuid().equals("P1") && | |||
submit.getUuid().equals("TASK_1"); | |||
} | |||
@@ -71,16 +71,16 @@ public class ReportSubmitterTest { | |||
@Test | |||
public void provision_project_if_does_not_exist() throws Exception { | |||
when(queue.prepareSubmit()).thenReturn(new CeTaskSubmit("TASK_1")); | |||
when(queue.prepareSubmit()).thenReturn(new TestTaskSubmission("TASK_1")); | |||
userSession.setGlobalPermissions(GlobalPermissions.SCAN_EXECUTION, GlobalPermissions.PROVISIONING); | |||
when(componentService.getNullableByKey("MY_PROJECT")).thenReturn(null); | |||
when(componentService.create(any(NewComponent.class))).thenReturn(new ComponentDto().setUuid("P1")); | |||
underTest.submit("MY_PROJECT", null, "My Project", IOUtils.toInputStream("{binary}")); | |||
verify(queue).submit(argThat(new TypeSafeMatcher<CeTaskSubmit>() { | |||
verify(queue).submit(argThat(new TypeSafeMatcher<TaskSubmission>() { | |||
@Override | |||
protected boolean matchesSafely(CeTaskSubmit submit) { | |||
protected boolean matchesSafely(TaskSubmission submit) { | |||
return submit.getType().equals(CeTaskTypes.REPORT) && submit.getComponentUuid().equals("P1") && | |||
submit.getUuid().equals("TASK_1"); | |||
} |
@@ -19,42 +19,53 @@ | |||
*/ | |||
package org.sonar.server.computation; | |||
public class CeTaskSubmit { | |||
import javax.annotation.Nullable; | |||
public class TestTaskSubmission implements TaskSubmission { | |||
private final String uuid; | |||
private String type; | |||
private String componentUuid; | |||
private String submitterLogin; | |||
CeTaskSubmit(String uuid) { | |||
public TestTaskSubmission(String uuid) { | |||
this.uuid = uuid; | |||
} | |||
@Override | |||
public String getUuid() { | |||
return uuid; | |||
} | |||
@Override | |||
public String getType() { | |||
return type; | |||
} | |||
public void setType(String type) { | |||
this.type = type; | |||
@Override | |||
public TaskSubmission setType(String s) { | |||
this.type = s; | |||
return this; | |||
} | |||
@Override | |||
public String getComponentUuid() { | |||
return componentUuid; | |||
} | |||
public void setComponentUuid(String s) { | |||
@Override | |||
public TaskSubmission setComponentUuid(@Nullable String s) { | |||
this.componentUuid = s; | |||
return this; | |||
} | |||
@Override | |||
public String getSubmitterLogin() { | |||
return submitterLogin; | |||
} | |||
public void setSubmitterLogin(String s) { | |||
@Override | |||
public TaskSubmission setSubmitterLogin(@Nullable String s) { | |||
this.submitterLogin = s; | |||
return this; | |||
} | |||
} |
@@ -20,7 +20,7 @@ | |||
package org.sonar.server.computation.monitoring; | |||
import org.junit.Test; | |||
import org.sonar.server.computation.CeQueue; | |||
import org.sonar.server.computation.CeQueueImpl; | |||
import static org.assertj.core.api.Assertions.assertThat; | |||
import static org.assertj.core.api.Assertions.entry; | |||
@@ -34,7 +34,7 @@ public class ComputeEngineQueueMonitorTest { | |||
private static final long SUCCESS_COUNT = 13; | |||
private static final long PROCESSING_TIME = 987; | |||
private ComputeEngineQueueMonitor underTest = new ComputeEngineQueueMonitor(new DumbCEQueueStatus(), mock(CeQueue.class)); | |||
private ComputeEngineQueueMonitor underTest = new ComputeEngineQueueMonitor(new DumbCEQueueStatus(), mock(CeQueueImpl.class)); | |||
@Test | |||
public void name_is_ComputeEngineQueue() { |
@@ -0,0 +1,174 @@ | |||
/* | |||
* SonarQube, open source software quality management tool. | |||
* Copyright (C) 2008-2014 SonarSource | |||
* mailto:contact AT sonarsource DOT com | |||
* | |||
* SonarQube is free software; you can redistribute it and/or | |||
* modify it under the terms of the GNU Lesser General Public | |||
* License as published by the Free Software Foundation; either | |||
* version 3 of the License, or (at your option) any later version. | |||
* | |||
* SonarQube is distributed in the hope that it will be useful, | |||
* but WITHOUT ANY WARRANTY; without even the implied warranty of | |||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |||
* Lesser General Public License for more details. | |||
* | |||
* You should have received a copy of the GNU Lesser General Public License | |||
* along with this program; if not, write to the Free Software Foundation, | |||
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. | |||
*/ | |||
package org.sonar.server.computation.ws; | |||
import java.util.Collections; | |||
import java.util.List; | |||
import org.junit.Rule; | |||
import org.junit.Test; | |||
import org.sonar.api.server.ws.WebService; | |||
import org.sonar.api.utils.System2; | |||
import org.sonar.api.web.UserRole; | |||
import org.sonar.core.util.Protobuf; | |||
import org.sonar.db.DbTester; | |||
import org.sonar.db.ce.CeActivityDto; | |||
import org.sonar.db.ce.CeQueueDto; | |||
import org.sonar.db.ce.CeTaskTypes; | |||
import org.sonar.server.plugins.MimeTypes; | |||
import org.sonar.server.tester.UserSessionRule; | |||
import org.sonar.server.ws.TestResponse; | |||
import org.sonar.server.ws.WsActionTester; | |||
import org.sonarqube.ws.WsCe; | |||
import static java.util.Arrays.asList; | |||
import static org.assertj.core.api.Assertions.assertThat; | |||
public class CeActivityWsActionTest { | |||
@Rule | |||
public UserSessionRule userSession = UserSessionRule.standalone(); | |||
@Rule | |||
public DbTester dbTester = DbTester.create(System2.INSTANCE); | |||
CeWsTaskFormatter formatter = new CeWsTaskFormatter(dbTester.getDbClient()); | |||
CeActivityWsAction underTest = new CeActivityWsAction(userSession, dbTester.getDbClient(), formatter); | |||
WsActionTester tester = new WsActionTester(underTest); | |||
@Test | |||
public void get_all_past_activity() { | |||
userSession.setGlobalPermissions(UserRole.ADMIN); | |||
insert("T1", "PROJECT_1", CeActivityDto.Status.SUCCESS); | |||
insert("T2", "PROJECT_2", CeActivityDto.Status.FAILED); | |||
TestResponse wsResponse = tester.newRequest() | |||
.setMediaType(MimeTypes.PROTOBUF) | |||
.execute(); | |||
// verify the protobuf response | |||
WsCe.ActivityResponse activityResponse = Protobuf.read(wsResponse.getInputStream(), WsCe.ActivityResponse.PARSER); | |||
assertThat(activityResponse.getTasksCount()).isEqualTo(2); | |||
// chronological order, from newest to oldest | |||
assertThat(activityResponse.getTasks(0).getId()).isEqualTo("T2"); | |||
assertThat(activityResponse.getTasks(0).getStatus()).isEqualTo(WsCe.TaskStatus.FAILED); | |||
assertThat(activityResponse.getTasks(0).getComponentId()).isEqualTo("PROJECT_2"); | |||
assertThat(activityResponse.getTasks(0).getExecutionTimeMs()).isEqualTo(500L); | |||
assertThat(activityResponse.getTasks(1).getId()).isEqualTo("T1"); | |||
assertThat(activityResponse.getTasks(1).getStatus()).isEqualTo(WsCe.TaskStatus.SUCCESS); | |||
assertThat(activityResponse.getTasks(1).getComponentId()).isEqualTo("PROJECT_1"); | |||
} | |||
@Test | |||
public void filter_by_status() { | |||
userSession.setGlobalPermissions(UserRole.ADMIN); | |||
insert("T1", "PROJECT_1", CeActivityDto.Status.SUCCESS); | |||
insert("T2", "PROJECT_2", CeActivityDto.Status.FAILED); | |||
TestResponse wsResponse = tester.newRequest() | |||
.setParam("status", "FAILED") | |||
.setMediaType(MimeTypes.PROTOBUF) | |||
.execute(); | |||
WsCe.ActivityResponse activityResponse = Protobuf.read(wsResponse.getInputStream(), WsCe.ActivityResponse.PARSER); | |||
assertThat(activityResponse.getTasksCount()).isEqualTo(1); | |||
assertThat(activityResponse.getTasks(0).getId()).isEqualTo("T2"); | |||
} | |||
@Test | |||
public void filter_on_current_activities() { | |||
userSession.setGlobalPermissions(UserRole.ADMIN); | |||
// T2 is the current activity (the most recent one) | |||
insert("T1", "PROJECT_1", CeActivityDto.Status.SUCCESS); | |||
insert("T2", "PROJECT_1", CeActivityDto.Status.FAILED); | |||
TestResponse wsResponse = tester.newRequest() | |||
.setParam("onlyCurrents", "true") | |||
.setMediaType(MimeTypes.PROTOBUF) | |||
.execute(); | |||
WsCe.ActivityResponse activityResponse = Protobuf.read(wsResponse.getInputStream(), WsCe.ActivityResponse.PARSER); | |||
assertThat(activityResponse.getTasksCount()).isEqualTo(1); | |||
assertThat(activityResponse.getTasks(0).getId()).isEqualTo("T2"); | |||
} | |||
@Test | |||
public void paginate_results() { | |||
userSession.setGlobalPermissions(UserRole.ADMIN); | |||
insert("T1", "PROJECT_1", CeActivityDto.Status.SUCCESS); | |||
insert("T2", "PROJECT_2", CeActivityDto.Status.FAILED); | |||
assertPage(1, 1, 2, asList("T2")); | |||
assertPage(2, 1, 2, asList("T1")); | |||
assertPage(1, 10, 2, asList("T2", "T1")); | |||
assertPage(2, 10, 2, Collections.<String>emptyList()); | |||
} | |||
private void assertPage(int pageIndex, int pageSize, int expectedTotal, List<String> expectedOrderedTaskIds) { | |||
TestResponse wsResponse = tester.newRequest() | |||
.setMediaType(MimeTypes.PROTOBUF) | |||
.setParam(WebService.Param.PAGE, Integer.toString(pageIndex)) | |||
.setParam(WebService.Param.PAGE_SIZE, Integer.toString(pageSize)) | |||
.execute(); | |||
WsCe.ActivityResponse activityResponse = Protobuf.read(wsResponse.getInputStream(), WsCe.ActivityResponse.PARSER); | |||
assertThat(activityResponse.getPaging().getPageIndex()).isEqualTo(pageIndex); | |||
assertThat(activityResponse.getPaging().getPageSize()).isEqualTo(pageSize); | |||
assertThat(activityResponse.getPaging().getTotal()).isEqualTo(expectedTotal); | |||
assertThat(activityResponse.getTasksCount()).isEqualTo(expectedOrderedTaskIds.size()); | |||
for (int i = 0; i < expectedOrderedTaskIds.size(); i++) { | |||
String expectedTaskId = expectedOrderedTaskIds.get(i); | |||
assertThat(activityResponse.getTasks(i).getId()).isEqualTo(expectedTaskId); | |||
} | |||
} | |||
@Test | |||
public void get_project_activity() { | |||
userSession.addProjectUuidPermissions(UserRole.ADMIN, "PROJECT_1"); | |||
insert("T1", "PROJECT_1", CeActivityDto.Status.SUCCESS); | |||
insert("T2", "PROJECT_2", CeActivityDto.Status.FAILED); | |||
TestResponse wsResponse = tester.newRequest() | |||
.setParam("componentId", "PROJECT_1") | |||
.setMediaType(MimeTypes.PROTOBUF) | |||
.execute(); | |||
// verify the protobuf response | |||
WsCe.ActivityResponse activityResponse = Protobuf.read(wsResponse.getInputStream(), WsCe.ActivityResponse.PARSER); | |||
assertThat(activityResponse.getTasksCount()).isEqualTo(1); | |||
assertThat(activityResponse.getTasks(0).getId()).isEqualTo("T1"); | |||
assertThat(activityResponse.getTasks(0).getStatus()).isEqualTo(WsCe.TaskStatus.SUCCESS); | |||
assertThat(activityResponse.getTasks(0).getComponentId()).isEqualTo("PROJECT_1"); | |||
} | |||
private CeActivityDto insert(String taskUuid, String componentUuid, CeActivityDto.Status status) { | |||
CeQueueDto queueDto = new CeQueueDto(); | |||
queueDto.setTaskType(CeTaskTypes.REPORT); | |||
queueDto.setComponentUuid(componentUuid); | |||
queueDto.setUuid(taskUuid); | |||
CeActivityDto activityDto = new CeActivityDto(queueDto); | |||
activityDto.setStatus(status); | |||
activityDto.setExecutionTimeMs(500L); | |||
dbTester.getDbClient().ceActivityDao().insert(dbTester.getSession(), activityDto); | |||
dbTester.getSession().commit(); | |||
return activityDto; | |||
} | |||
} |
@@ -25,7 +25,6 @@ import org.mockito.Matchers; | |||
import org.sonar.core.util.Protobuf; | |||
import org.sonar.db.ce.CeTaskTypes; | |||
import org.sonar.server.computation.CeTask; | |||
import org.sonar.server.computation.ReportProcessingScheduler; | |||
import org.sonar.server.computation.ReportSubmitter; | |||
import org.sonar.server.plugins.MimeTypes; | |||
import org.sonar.server.ws.TestResponse; | |||
@@ -43,8 +42,7 @@ import static org.mockito.Mockito.when; | |||
public class CeSubmitWsActionTest { | |||
ReportSubmitter reportSubmitter = mock(ReportSubmitter.class); | |||
ReportProcessingScheduler reportProcessingScheduler = mock(ReportProcessingScheduler.class); | |||
CeSubmitWsAction underTest = new CeSubmitWsAction(reportSubmitter, reportProcessingScheduler); | |||
CeSubmitWsAction underTest = new CeSubmitWsAction(reportSubmitter); | |||
WsActionTester tester = new WsActionTester(underTest); | |||
@Test | |||
@@ -61,16 +59,14 @@ public class CeSubmitWsActionTest { | |||
.execute(); | |||
verify(reportSubmitter).submit(eq("my_project"), Matchers.isNull(String.class), eq("My Project"), any(InputStream.class)); | |||
verify(reportProcessingScheduler).startAnalysisTaskNow(); | |||
// verify the protobuf response | |||
WsCe.SubmitResponse submitResponse = Protobuf.read(wsResponse.getInputStream(), WsCe.SubmitResponse.PARSER); | |||
assertThat(submitResponse.getTaskId()).isEqualTo("TASK_1"); | |||
assertThat(submitResponse.getProjectId()).isEqualTo("PROJECT_1"); | |||
} | |||
@Test | |||
public void test_response_example() { | |||
public void test_example_json_response() { | |||
CeTask task = new CeTask("TASK_1", CeTaskTypes.REPORT, "PROJECT_1", "robert"); | |||
when(reportSubmitter.submit(eq("my_project"), Matchers.isNull(String.class), eq("My Project"), any(InputStream.class))).thenReturn(task); | |||
@@ -22,7 +22,6 @@ package org.sonar.server.computation.ws; | |||
import org.junit.Test; | |||
import org.mockito.Mockito; | |||
import org.sonar.api.server.ws.WebService; | |||
import org.sonar.server.computation.ReportProcessingScheduler; | |||
import org.sonar.server.computation.ReportSubmitter; | |||
import static org.assertj.core.api.Assertions.assertThat; | |||
@@ -32,7 +31,7 @@ public class CeWsTest { | |||
@Test | |||
public void define() throws Exception { | |||
CeWsAction wsAction = new CeSubmitWsAction(mock(ReportSubmitter.class), mock(ReportProcessingScheduler.class)); | |||
CeWsAction wsAction = new CeSubmitWsAction(mock(ReportSubmitter.class)); | |||
CeWs ws = new CeWs(wsAction); | |||
WebService.Context context = mock(WebService.Context.class, Mockito.RETURNS_DEEP_STUBS); |
@@ -39,6 +39,7 @@ class CreateCeActivity < ActiveRecord::Migration | |||
t.column 'execution_time_ms', :big_integer, :null => true | |||
end | |||
add_index 'ce_activity', 'uuid', :name => 'ce_activity_uuid', :unique => true | |||
add_index 'ce_activity', 'component_uuid', :name => 'ce_activity_component_uuid' | |||
end | |||
end |
@@ -19,9 +19,11 @@ | |||
*/ | |||
package org.sonar.batch.util; | |||
import com.google.common.base.Strings; | |||
import java.io.UnsupportedEncodingException; | |||
import java.net.URLEncoder; | |||
import javax.annotation.Nullable; | |||
import org.apache.commons.lang.StringUtils; | |||
public class BatchUtils { | |||
@@ -38,9 +40,9 @@ public class BatchUtils { | |||
return StringUtils.replace(cleanKey, ":", "_"); | |||
} | |||
public static String encodeForUrl(String url) { | |||
public static String encodeForUrl(@Nullable String url) { | |||
try { | |||
return URLEncoder.encode(url, "UTF-8"); | |||
return URLEncoder.encode(Strings.nullToEmpty(url), "UTF-8"); | |||
} catch (UnsupportedEncodingException e) { | |||
throw new IllegalStateException("Encoding not supported", e); |
@@ -0,0 +1,35 @@ | |||
/* | |||
* SonarQube, open source software quality management tool. | |||
* Copyright (C) 2008-2014 SonarSource | |||
* mailto:contact AT sonarsource DOT com | |||
* | |||
* SonarQube is free software; you can redistribute it and/or | |||
* modify it under the terms of the GNU Lesser General Public | |||
* License as published by the Free Software Foundation; either | |||
* version 3 of the License, or (at your option) any later version. | |||
* | |||
* SonarQube is distributed in the hope that it will be useful, | |||
* but WITHOUT ANY WARRANTY; without even the implied warranty of | |||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |||
* Lesser General Public License for more details. | |||
* | |||
* You should have received a copy of the GNU Lesser General Public License | |||
* along with this program; if not, write to the Free Software Foundation, | |||
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. | |||
*/ | |||
package org.sonar.batch.util; | |||
import org.junit.Test; | |||
import static org.assertj.core.api.Assertions.assertThat; | |||
public class BatchUtilsTest { | |||
@Test | |||
public void encodeForUrl() throws Exception { | |||
assertThat(BatchUtils.encodeForUrl(null)).isEqualTo(""); | |||
assertThat(BatchUtils.encodeForUrl("")).isEqualTo(""); | |||
assertThat(BatchUtils.encodeForUrl("foo")).isEqualTo("foo"); | |||
assertThat(BatchUtils.encodeForUrl("foo&bar")).isEqualTo("foo%26bar"); | |||
} | |||
} |
@@ -57,12 +57,16 @@ public class CeActivityDao implements Dao { | |||
} | |||
/** | |||
* Ordered by id asc -> oldest to newest | |||
* Ordered by id desc -> newest to oldest | |||
*/ | |||
public List<CeActivityDto> selectByQuery(DbSession dbSession, CeActivityQuery query, RowBounds rowBounds) { | |||
return mapper(dbSession).selectByQuery(query, rowBounds); | |||
} | |||
public int countByQuery(DbSession dbSession, CeActivityQuery query) { | |||
return mapper(dbSession).countByQuery(query); | |||
} | |||
private CeActivityMapper mapper(DbSession dbSession) { | |||
return dbSession.getMapper(CeActivityMapper.class); | |||
} |
@@ -35,6 +35,8 @@ public interface CeActivityMapper { | |||
List<CeActivityDto> selectByQuery(@Param("query") CeActivityQuery query, RowBounds rowBounds); | |||
int countByQuery(@Param("query") CeActivityQuery query); | |||
void insert(CeActivityDto dto); | |||
void updateIsLastToFalseForLastKey(@Param("isLastKey") String isLastKey, @Param("updatedAt") long updatedAt); |
@@ -19,6 +19,9 @@ | |||
*/ | |||
package org.sonar.db.ce; | |||
import javax.annotation.CheckForNull; | |||
import javax.annotation.Nullable; | |||
public class CeActivityQuery { | |||
private boolean onlyCurrents = false; | |||
@@ -26,11 +29,12 @@ public class CeActivityQuery { | |||
private CeActivityDto.Status status; | |||
private String type; | |||
@CheckForNull | |||
public String getComponentUuid() { | |||
return componentUuid; | |||
} | |||
public CeActivityQuery setComponentUuid(String componentUuid) { | |||
public CeActivityQuery setComponentUuid(@Nullable String componentUuid) { | |||
this.componentUuid = componentUuid; | |||
return this; | |||
} | |||
@@ -44,20 +48,22 @@ public class CeActivityQuery { | |||
return this; | |||
} | |||
@CheckForNull | |||
public CeActivityDto.Status getStatus() { | |||
return status; | |||
} | |||
public CeActivityQuery setStatus(CeActivityDto.Status status) { | |||
public CeActivityQuery setStatus(@Nullable CeActivityDto.Status status) { | |||
this.status = status; | |||
return this; | |||
} | |||
@CheckForNull | |||
public String getType() { | |||
return type; | |||
} | |||
public CeActivityQuery setType(String type) { | |||
public CeActivityQuery setType(@Nullable String type) { | |||
this.type = type; | |||
return this; | |||
} |
@@ -62,6 +62,25 @@ | |||
order by ca.id desc | |||
</select> | |||
<select id="countByQuery" parameterType="map" resultType="int"> | |||
select count(ca.id) | |||
from ce_activity ca | |||
<where> | |||
<if test="query.onlyCurrents"> | |||
ca.is_last=${_true} | |||
</if> | |||
<if test="query.componentUuid != null"> | |||
ca.component_uuid=#{query.componentUuid} | |||
</if> | |||
<if test="query.status != null"> | |||
ca.status=#{query.status} | |||
</if> | |||
<if test="query.type != null"> | |||
ca.task_type=#{query.type} | |||
</if> | |||
</where> | |||
</select> | |||
<insert id="insert" parameterType="org.sonar.db.ce.CeActivityDto" useGeneratedKeys="false"> | |||
insert into ce_activity | |||
(uuid, component_uuid, status, task_type, is_last, is_last_key, submitter_login, submitted_at, started_at, |
@@ -708,3 +708,5 @@ CREATE UNIQUE INDEX "PROJECT_QPROFILES_UNIQUE" ON "PROJECT_QPROFILES" ("PROJECT_ | |||
CREATE UNIQUE INDEX "CE_QUEUE_UUID" ON "CE_QUEUE" ("UUID"); | |||
CREATE UNIQUE INDEX "CE_ACTIVITY_UUID" ON "CE_ACTIVITY" ("UUID"); | |||
CREATE INDEX "CE_ACTIVITY_COMPONENT_UUID" ON "CE_ACTIVITY" ("COMPONENT_UUID"); |
@@ -110,6 +110,32 @@ public class CeActivityDaoTest { | |||
assertThat(dtos).extracting("uuid").containsExactly("TASK_4"); | |||
} | |||
@Test | |||
public void test_countByQuery() throws Exception { | |||
insert("TASK_1", REPORT, "PROJECT_1", CeActivityDto.Status.SUCCESS); | |||
insert("TASK_2", REPORT, "PROJECT_1", CeActivityDto.Status.FAILED); | |||
insert("TASK_3", REPORT, "PROJECT_2", CeActivityDto.Status.SUCCESS); | |||
insert("TASK_4", "views", null, CeActivityDto.Status.SUCCESS); | |||
// no filters | |||
CeActivityQuery query = new CeActivityQuery(); | |||
assertThat(underTest.countByQuery(db.getSession(), query)).isEqualTo(4); | |||
// select by component uuid | |||
query = new CeActivityQuery().setComponentUuid("PROJECT_1"); | |||
assertThat(underTest.countByQuery(db.getSession(), query)).isEqualTo(2); | |||
// select by status | |||
query = new CeActivityQuery().setStatus(CeActivityDto.Status.SUCCESS); | |||
assertThat(underTest.countByQuery(db.getSession(), query)).isEqualTo(3); | |||
// select by type | |||
query = new CeActivityQuery().setType(REPORT); | |||
assertThat(underTest.countByQuery(db.getSession(), query)).isEqualTo(3); | |||
query = new CeActivityQuery().setType("views"); | |||
assertThat(underTest.countByQuery(db.getSession(), query)).isEqualTo(1); | |||
} | |||
@Test | |||
public void deleteOlderThan() throws Exception { | |||
insertWithDate("TASK_1", 1_450_000_000_000L); |