@@ -26,6 +26,7 @@ import org.sonar.api.utils.log.Logger; | |||
import org.sonar.api.utils.log.Loggers; | |||
import org.sonar.core.util.logs.Profiler; | |||
import org.sonar.server.computation.activity.ActivityManager; | |||
import org.sonar.server.computation.monitoring.CEQueueStatus; | |||
import org.sonar.server.computation.step.ComputationStep; | |||
import org.sonar.server.computation.step.ComputationSteps; | |||
@@ -42,15 +43,19 @@ public class ComputationService { | |||
private final ComputationSteps steps; | |||
private final ActivityManager activityManager; | |||
private final System2 system; | |||
private final CEQueueStatus queueStatus; | |||
public ComputationService(ReportQueue.Item item, ComputationSteps steps, ActivityManager activityManager, System2 system) { | |||
public ComputationService(ReportQueue.Item item, ComputationSteps steps, ActivityManager activityManager, System2 system, | |||
CEQueueStatus queueStatus) { | |||
this.item = item; | |||
this.steps = steps; | |||
this.activityManager = activityManager; | |||
this.system = system; | |||
this.queueStatus = queueStatus; | |||
} | |||
public void process() { | |||
queueStatus.addInProgress(); | |||
String projectKey = item.dto.getProjectKey(); | |||
String message = format("Analysis of project %s (report %d)", projectKey, item.dto.getId()); | |||
Profiler profiler = Profiler.create(LOG).startDebug(message); | |||
@@ -64,8 +69,10 @@ public class ComputationService { | |||
timingSum += stepProfiler.stopInfo(step.getDescription()); | |||
} | |||
item.dto.setStatus(SUCCESS); | |||
queueStatus.addSuccess(); | |||
} catch (Throwable e) { | |||
item.dto.setStatus(FAILED); | |||
queueStatus.addError(); | |||
throw Throwables.propagate(e); | |||
} finally { | |||
item.dto.setFinishedAt(system.now()); |
@@ -30,12 +30,13 @@ import org.sonar.api.config.Settings; | |||
import org.sonar.api.server.ServerSide; | |||
import org.sonar.api.utils.internal.Uuids; | |||
import org.sonar.api.utils.log.Loggers; | |||
import org.sonar.db.compute.AnalysisReportDto; | |||
import org.sonar.db.DbClient; | |||
import org.sonar.db.DbSession; | |||
import org.sonar.db.MyBatis; | |||
import org.sonar.process.ProcessProperties; | |||
import org.sonar.db.compute.AnalysisReportDao; | |||
import org.sonar.db.DbClient; | |||
import org.sonar.db.compute.AnalysisReportDto; | |||
import org.sonar.process.ProcessProperties; | |||
import org.sonar.server.computation.monitoring.CEQueueStatus; | |||
import static org.sonar.db.compute.AnalysisReportDto.Status.PENDING; | |||
@@ -43,10 +44,21 @@ import static org.sonar.db.compute.AnalysisReportDto.Status.PENDING; | |||
public class ReportQueue { | |||
private final DbClient dbClient; | |||
private final Settings settings; | |||
private final CEQueueStatus queueStatus; | |||
public ReportQueue(DbClient dbClient, Settings settings) { | |||
public ReportQueue(DbClient dbClient, Settings settings, CEQueueStatus queueStatus) { | |||
this.dbClient = dbClient; | |||
this.settings = settings; | |||
this.queueStatus = queueStatus; | |||
} | |||
public void start() { | |||
DbSession session = dbClient.openSession(false); | |||
try { | |||
queueStatus.initPendingCount(dao().countPending(session)); | |||
} finally { | |||
MyBatis.closeQuietly(session); | |||
} | |||
} | |||
public Item add(String projectKey, String projectName, InputStream reportData) { |
@@ -0,0 +1,103 @@ | |||
/* | |||
* SonarQube, open source software quality management tool. | |||
* Copyright (C) 2008-2014 SonarSource | |||
* mailto:contact AT sonarsource DOT com | |||
* | |||
* SonarQube is free software; you can redistribute it and/or | |||
* modify it under the terms of the GNU Lesser General Public | |||
* License as published by the Free Software Foundation; either | |||
* version 3 of the License, or (at your option) any later version. | |||
* | |||
* SonarQube is distributed in the hope that it will be useful, | |||
* but WITHOUT ANY WARRANTY; without even the implied warranty of | |||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |||
* Lesser General Public License for more details. | |||
* | |||
* You should have received a copy of the GNU Lesser General Public License | |||
* along with this program; if not, write to the Free Software Foundation, | |||
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. | |||
*/ | |||
package org.sonar.server.computation.monitoring; | |||
public interface CEQueueStatus { | |||
/** | |||
* Adds 1 to the count of received batch reports and 1 to the count of batch reports waiting for processing. | |||
* | |||
* @return the new count of received batch reports | |||
* | |||
* @see #getReceivedCount() | |||
* @see #getPendingCount() | |||
*/ | |||
long addReceived(); | |||
/** | |||
* Count of received batch reports since instance startup | |||
*/ | |||
long getReceivedCount(); | |||
/** | |||
* Sets the count of reports waiting for processing at startup. This method can be called only once. | |||
* | |||
* @param initialPendingCount the count of reports, must be {@literal >=} 0 | |||
* | |||
* @return the new count of batch reports waiting for processing (which is the same as the argument) | |||
* | |||
* @throws IllegalStateException if this method has already been called or is called after {@link #getPendingCount()} | |||
* @throws IllegalArgumentException if the argument is {@literal <} 0 | |||
*/ | |||
long initPendingCount(long initialPendingCount); | |||
/** | |||
* Count of batch report waiting for processing since startup, including reports received before instance startup. | |||
*/ | |||
long getPendingCount(); | |||
/** | |||
* Adds 1 to the count of batch reports under processing and removes 1 from the count of batch reports waiting for | |||
* processing. | |||
* | |||
* @return the new count of batch report under processing | |||
* | |||
* @see #getInProgressCount() | |||
* @see #getPendingCount() | |||
*/ | |||
long addInProgress(); | |||
/** | |||
* Count of batch report under processing. | |||
*/ | |||
long getInProgressCount(); | |||
/** | |||
* Adds 1 to the count of batch report which processing ended with an error and removes 1 from the count of batch | |||
* report under processing. | |||
* | |||
* @return the new count of batch report which processing ended with an error | |||
* | |||
* @see #getErrorCount() | |||
* @see #getInProgressCount() | |||
*/ | |||
long addError(); | |||
/** | |||
* Count of batch report which processing ended with an error since instance startup. | |||
*/ | |||
long getErrorCount(); | |||
/** | |||
* Adds 1 to the count of batch report which processing ended successfully and removes 1 from the count of batch | |||
* report under processing. | |||
* | |||
* @return the new count of batch report which processing ended successfully | |||
* | |||
* @see #getSuccessCount() | |||
* @see #getInProgressCount() | |||
*/ | |||
long addSuccess(); | |||
/** | |||
* Count of batch report which processing ended successfully since instance startup. | |||
*/ | |||
long getSuccessCount(); | |||
} |
@@ -0,0 +1,103 @@ | |||
/* | |||
* SonarQube, open source software quality management tool. | |||
* Copyright (C) 2008-2014 SonarSource | |||
* mailto:contact AT sonarsource DOT com | |||
* | |||
* SonarQube is free software; you can redistribute it and/or | |||
* modify it under the terms of the GNU Lesser General Public | |||
* License as published by the Free Software Foundation; either | |||
* version 3 of the License, or (at your option) any later version. | |||
* | |||
* SonarQube is distributed in the hope that it will be useful, | |||
* but WITHOUT ANY WARRANTY; without even the implied warranty of | |||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |||
* Lesser General Public License for more details. | |||
* | |||
* You should have received a copy of the GNU Lesser General Public License | |||
* along with this program; if not, write to the Free Software Foundation, | |||
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. | |||
*/ | |||
package org.sonar.server.computation.monitoring; | |||
import java.util.concurrent.atomic.AtomicLong; | |||
import static com.google.common.base.Preconditions.checkArgument; | |||
public class CEQueueStatusImpl implements CEQueueStatus { | |||
private static final long PENDING_INITIAL_VALUE = Long.MIN_VALUE; | |||
private final AtomicLong received = new AtomicLong(0); | |||
private final AtomicLong pending = new AtomicLong(PENDING_INITIAL_VALUE); | |||
private final AtomicLong inProgress = new AtomicLong(0); | |||
private final AtomicLong error = new AtomicLong(0); | |||
private final AtomicLong success = new AtomicLong(0); | |||
@Override | |||
public long addReceived() { | |||
// initPendingCount might not have been called yet | |||
if (!pending.compareAndSet(PENDING_INITIAL_VALUE, 1)) { | |||
pending.incrementAndGet(); | |||
} | |||
return received.incrementAndGet(); | |||
} | |||
@Override | |||
public long getReceivedCount() { | |||
return received.get(); | |||
} | |||
@Override | |||
public long initPendingCount(long initialPendingCount) { | |||
checkArgument(initialPendingCount >= 0, "Initial pending count must be >= 0"); | |||
if (!pending.compareAndSet(PENDING_INITIAL_VALUE, initialPendingCount)) { | |||
throw new IllegalStateException("Method initPendingCount must be used before any other method and can not be called twice"); | |||
} | |||
return initialPendingCount; | |||
} | |||
@Override | |||
public long getPendingCount() { | |||
ensurePendingIsInitialized(); | |||
return pending.get(); | |||
} | |||
@Override | |||
public long addInProgress() { | |||
ensurePendingIsInitialized(); | |||
pending.decrementAndGet(); | |||
return inProgress.incrementAndGet(); | |||
} | |||
private void ensurePendingIsInitialized() { | |||
pending.compareAndSet(PENDING_INITIAL_VALUE, 0); | |||
} | |||
@Override | |||
public long getInProgressCount() { | |||
return inProgress.get(); | |||
} | |||
@Override | |||
public long addError() { | |||
inProgress.decrementAndGet(); | |||
return error.incrementAndGet(); | |||
} | |||
@Override | |||
public long getErrorCount() { | |||
return error.get(); | |||
} | |||
@Override | |||
public long addSuccess() { | |||
inProgress.decrementAndGet(); | |||
return success.incrementAndGet(); | |||
} | |||
@Override | |||
public long getSuccessCount() { | |||
return success.get(); | |||
} | |||
} |
@@ -0,0 +1,72 @@ | |||
/* | |||
* SonarQube, open source software quality management tool. | |||
* Copyright (C) 2008-2014 SonarSource | |||
* mailto:contact AT sonarsource DOT com | |||
* | |||
* SonarQube is free software; you can redistribute it and/or | |||
* modify it under the terms of the GNU Lesser General Public | |||
* License as published by the Free Software Foundation; either | |||
* version 3 of the License, or (at your option) any later version. | |||
* | |||
* SonarQube is distributed in the hope that it will be useful, | |||
* but WITHOUT ANY WARRANTY; without even the implied warranty of | |||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |||
* Lesser General Public License for more details. | |||
* | |||
* You should have received a copy of the GNU Lesser General Public License | |||
* along with this program; if not, write to the Free Software Foundation, | |||
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. | |||
*/ | |||
package org.sonar.server.computation.monitoring; | |||
import java.util.LinkedHashMap; | |||
import org.sonar.server.platform.monitoring.BaseMonitorMBean; | |||
public class ComputeEngineQueueMonitor extends BaseMonitorMBean implements ComputeEngineQueueMonitorMBean { | |||
private final CEQueueStatus queueStatus; | |||
public ComputeEngineQueueMonitor(CEQueueStatus queueStatus) { | |||
this.queueStatus = queueStatus; | |||
} | |||
@Override | |||
public String name() { | |||
return "ComputeEngineQueue"; | |||
} | |||
@Override | |||
public LinkedHashMap<String, Object> attributes() { | |||
LinkedHashMap<String, Object> attributes = new LinkedHashMap<>(); | |||
attributes.put("Received reports", getReceivedCount()); | |||
attributes.put("Pending reports", getPendingCount()); | |||
attributes.put("In progress", getInProgressCount()); | |||
attributes.put("Successfully processed", getSuccessCount()); | |||
attributes.put("Processed with error", getErrorCount()); | |||
return attributes; | |||
} | |||
@Override | |||
public long getReceivedCount() { | |||
return queueStatus.getReceivedCount(); | |||
} | |||
@Override | |||
public long getPendingCount() { | |||
return queueStatus.getPendingCount(); | |||
} | |||
@Override | |||
public long getInProgressCount() { | |||
return queueStatus.getInProgressCount(); | |||
} | |||
@Override | |||
public long getErrorCount() { | |||
return queueStatus.getErrorCount(); | |||
} | |||
@Override | |||
public long getSuccessCount() { | |||
return queueStatus.getSuccessCount(); | |||
} | |||
} |
@@ -0,0 +1,48 @@ | |||
/* | |||
* SonarQube, open source software quality management tool. | |||
* Copyright (C) 2008-2014 SonarSource | |||
* mailto:contact AT sonarsource DOT com | |||
* | |||
* SonarQube is free software; you can redistribute it and/or | |||
* modify it under the terms of the GNU Lesser General Public | |||
* License as published by the Free Software Foundation; either | |||
* version 3 of the License, or (at your option) any later version. | |||
* | |||
* SonarQube is distributed in the hope that it will be useful, | |||
* but WITHOUT ANY WARRANTY; without even the implied warranty of | |||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |||
* Lesser General Public License for more details. | |||
* | |||
* You should have received a copy of the GNU Lesser General Public License | |||
* along with this program; if not, write to the Free Software Foundation, | |||
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. | |||
*/ | |||
package org.sonar.server.computation.monitoring; | |||
public interface ComputeEngineQueueMonitorMBean { | |||
/** | |||
* Count of received batch reports since instance startup | |||
*/ | |||
long getReceivedCount(); | |||
/** | |||
* Count of batch report waiting for processing since startup, including reports received before instance startup. | |||
*/ | |||
long getPendingCount(); | |||
/** | |||
* Count of batch report under processing. | |||
*/ | |||
long getInProgressCount(); | |||
/** | |||
* Count of batch report which processing ended with an error since instance startup. | |||
*/ | |||
long getErrorCount(); | |||
/** | |||
* Count of batch report which processing ended successfully since instance startup. | |||
*/ | |||
long getSuccessCount(); | |||
} |
@@ -20,6 +20,7 @@ | |||
package org.sonar.server.computation.ws; | |||
import java.io.InputStream; | |||
import org.apache.commons.io.IOUtils; | |||
import org.sonar.api.server.ws.Request; | |||
import org.sonar.api.server.ws.Response; | |||
@@ -27,10 +28,9 @@ import org.sonar.api.server.ws.WebService; | |||
import org.sonar.core.permission.GlobalPermissions; | |||
import org.sonar.server.computation.ComputationThreadLauncher; | |||
import org.sonar.server.computation.ReportQueue; | |||
import org.sonar.server.computation.monitoring.CEQueueStatus; | |||
import org.sonar.server.user.UserSession; | |||
import java.io.InputStream; | |||
public class SubmitReportAction implements ComputationWsAction { | |||
public static final String ACTION = "submit_report"; | |||
@@ -41,11 +41,13 @@ public class SubmitReportAction implements ComputationWsAction { | |||
private final ReportQueue queue; | |||
private final ComputationThreadLauncher workerLauncher; | |||
private final UserSession userSession; | |||
private final CEQueueStatus queueStatus; | |||
public SubmitReportAction(ReportQueue queue, ComputationThreadLauncher workerLauncher, UserSession userSession) { | |||
public SubmitReportAction(ReportQueue queue, ComputationThreadLauncher workerLauncher, UserSession userSession, CEQueueStatus queueStatus) { | |||
this.queue = queue; | |||
this.workerLauncher = workerLauncher; | |||
this.userSession = userSession; | |||
this.queueStatus = queueStatus; | |||
} | |||
@Override | |||
@@ -82,6 +84,7 @@ public class SubmitReportAction implements ComputationWsAction { | |||
InputStream reportData = request.paramAsInputStream(PARAM_REPORT_DATA); | |||
try { | |||
ReportQueue.Item item = queue.add(projectKey, projectName, reportData); | |||
queueStatus.addReceived(); | |||
workerLauncher.startAnalysisTaskNow(); | |||
response.newJsonWriter() | |||
.beginObject() |
@@ -67,6 +67,8 @@ import org.sonar.server.component.ws.EventsWs; | |||
import org.sonar.server.component.ws.ResourcesWs; | |||
import org.sonar.server.computation.ComputationThreadLauncher; | |||
import org.sonar.server.computation.ReportQueue; | |||
import org.sonar.server.computation.monitoring.CEQueueStatusImpl; | |||
import org.sonar.server.computation.monitoring.ComputeEngineQueueMonitor; | |||
import org.sonar.server.computation.ws.ComputationWs; | |||
import org.sonar.server.computation.ws.HistoryAction; | |||
import org.sonar.server.computation.ws.IsQueueEmptyWs; | |||
@@ -721,6 +723,8 @@ public class PlatformLevel4 extends PlatformLevel { | |||
PluginsWs.class, | |||
// Compute engine | |||
CEQueueStatusImpl.class, | |||
ComputeEngineQueueMonitor.class, | |||
ReportQueue.class, | |||
ComputationThreadLauncher.class, | |||
ComputationWs.class, |
@@ -33,6 +33,7 @@ import org.sonar.api.utils.log.LoggerLevel; | |||
import org.sonar.db.compute.AnalysisReportDto; | |||
import org.sonar.db.compute.AnalysisReportDto.Status; | |||
import org.sonar.server.computation.activity.ActivityManager; | |||
import org.sonar.server.computation.monitoring.CEQueueStatus; | |||
import org.sonar.server.computation.step.ComputationStep; | |||
import org.sonar.server.computation.step.ComputationSteps; | |||
@@ -41,6 +42,7 @@ import static org.junit.Assert.fail; | |||
import static org.mockito.Mockito.doThrow; | |||
import static org.mockito.Mockito.mock; | |||
import static org.mockito.Mockito.verify; | |||
import static org.mockito.Mockito.verifyNoMoreInteractions; | |||
import static org.mockito.Mockito.when; | |||
public class ComputationServiceTest { | |||
@@ -53,12 +55,13 @@ public class ComputationServiceTest { | |||
ComputationSteps steps = mock(ComputationSteps.class); | |||
ActivityManager activityManager = mock(ActivityManager.class); | |||
System2 system = mock(System2.class); | |||
CEQueueStatus queueStatus = mock(CEQueueStatus.class); | |||
AnalysisReportDto dto = AnalysisReportDto.newForTests(1L).setProjectKey("P1").setUuid("U1").setStatus(Status.PENDING); | |||
ComputationService underTest; | |||
@Before | |||
public void setUp() { | |||
underTest = new ComputationService(new ReportQueue.Item(dto, new File("Do_not_care")), steps, activityManager, system); | |||
underTest = new ComputationService(new ReportQueue.Item(dto, new File("Do_not_care")), steps, activityManager, system, queueStatus); | |||
} | |||
@Test | |||
@@ -82,6 +85,10 @@ public class ComputationServiceTest { | |||
verify(projectStep1).execute(); | |||
verify(projectStep2).execute(); | |||
verify(activityManager).saveActivity(dto); | |||
verify(queueStatus).addInProgress(); | |||
verify(queueStatus).addSuccess(); | |||
verifyNoMoreInteractions(queueStatus); | |||
} | |||
@Test | |||
@@ -107,12 +114,16 @@ public class ComputationServiceTest { | |||
assertThat(e.getMessage()).isEqualTo(errorMessage); | |||
assertThat(dto.getStatus()).isEqualTo(Status.FAILED); | |||
assertThat(dto.getFinishedAt()).isNotNull(); | |||
verify(queueStatus).addInProgress(); | |||
verify(queueStatus).addError(); | |||
verifyNoMoreInteractions(queueStatus); | |||
} | |||
} | |||
@Test | |||
public void step_error() { | |||
when(steps.instances()).thenReturn(Arrays.asList(projectStep1)); | |||
when(steps.instances()).thenReturn(Collections.singleton(projectStep1)); | |||
doThrow(new IllegalStateException("pb")).when(projectStep1).execute(); | |||
try { | |||
@@ -122,6 +133,10 @@ public class ComputationServiceTest { | |||
assertThat(e.getMessage()).isEqualTo("pb"); | |||
assertThat(dto.getStatus()).isEqualTo(Status.FAILED); | |||
assertThat(dto.getFinishedAt()).isNotNull(); | |||
verify(queueStatus).addInProgress(); | |||
verify(queueStatus).addError(); | |||
verifyNoMoreInteractions(queueStatus); | |||
} | |||
} | |||
@@ -36,10 +36,13 @@ import org.sonar.db.DbClient; | |||
import org.sonar.db.DbTester; | |||
import org.sonar.db.compute.AnalysisReportDto; | |||
import org.sonar.process.ProcessProperties; | |||
import org.sonar.server.computation.monitoring.CEQueueStatus; | |||
import org.sonar.test.DbTests; | |||
import static org.assertj.core.api.Assertions.assertThat; | |||
import static org.mockito.Mockito.mock; | |||
import static org.mockito.Mockito.verify; | |||
import static org.mockito.Mockito.verifyNoMoreInteractions; | |||
import static org.mockito.Mockito.when; | |||
import static org.sonar.db.compute.AnalysisReportDto.Status.PENDING; | |||
import static org.sonar.db.compute.AnalysisReportDto.Status.WORKING; | |||
@@ -54,12 +57,13 @@ public class ReportQueueTest { | |||
@Rule | |||
public DbTester db = DbTester.create(system); | |||
DbClient dbClient = db.getDbClient(); | |||
@Rule | |||
public TemporaryFolder temp = new TemporaryFolder(); | |||
DbClient dbClient = db.getDbClient(); | |||
Settings settings = new Settings(); | |||
CEQueueStatus queueStatus = mock(CEQueueStatus.class); | |||
File dataDir; | |||
ReportQueue underTest; | |||
@@ -69,7 +73,19 @@ public class ReportQueueTest { | |||
settings.setProperty(ProcessProperties.PATH_DATA, dataDir.getAbsolutePath()); | |||
when(system.now()).thenReturn(NOW); | |||
underTest = new ReportQueue(dbClient, settings); | |||
underTest = new ReportQueue(dbClient, settings, queueStatus); | |||
} | |||
@Test | |||
public void starts_initializes_count_of_pending_reports() { | |||
underTest.add("P1", "Project 1", generateData()); | |||
underTest.add("P2", "Project 2", generateData()); | |||
underTest.add("P3", "Project 3", generateData()); | |||
underTest.start(); | |||
verify(queueStatus).initPendingCount(3); | |||
verifyNoMoreInteractions(queueStatus); | |||
} | |||
@Test |
@@ -0,0 +1,110 @@ | |||
/* | |||
* SonarQube, open source software quality management tool. | |||
* Copyright (C) 2008-2014 SonarSource | |||
* mailto:contact AT sonarsource DOT com | |||
* | |||
* SonarQube is free software; you can redistribute it and/or | |||
* modify it under the terms of the GNU Lesser General Public | |||
* License as published by the Free Software Foundation; either | |||
* version 3 of the License, or (at your option) any later version. | |||
* | |||
* SonarQube is distributed in the hope that it will be useful, | |||
* but WITHOUT ANY WARRANTY; without even the implied warranty of | |||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |||
* Lesser General Public License for more details. | |||
* | |||
* You should have received a copy of the GNU Lesser General Public License | |||
* along with this program; if not, write to the Free Software Foundation, | |||
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. | |||
*/ | |||
package org.sonar.server.computation.monitoring; | |||
import java.util.ArrayList; | |||
import java.util.Collections; | |||
import java.util.List; | |||
import java.util.concurrent.ExecutorService; | |||
import java.util.concurrent.Executors; | |||
import java.util.concurrent.ThreadFactory; | |||
import java.util.concurrent.TimeUnit; | |||
import org.junit.After; | |||
import org.junit.Test; | |||
import static org.assertj.core.api.Assertions.assertThat; | |||
public class CEQueueStatusImplConcurrentTest { | |||
private ExecutorService executorService = Executors.newFixedThreadPool(10, new ThreadFactory() { | |||
private int cnt = 0; | |||
@Override | |||
public Thread newThread(Runnable r) { | |||
return new Thread(r, CEQueueStatusImplConcurrentTest.class.getSimpleName() + cnt++); | |||
} | |||
}); | |||
private CEQueueStatusImpl underTest = new CEQueueStatusImpl(); | |||
@After | |||
public void tearDown() throws Exception { | |||
executorService.shutdownNow(); | |||
} | |||
@Test | |||
public void test_concurrent_modifications_in_any_order() throws InterruptedException { | |||
for (Runnable runnable : buildShuffleCallsToUnderTest()) { | |||
executorService.submit(runnable); | |||
} | |||
executorService.awaitTermination(5, TimeUnit.SECONDS); | |||
assertThat(underTest.getReceivedCount()).isEqualTo(100); | |||
assertThat(underTest.getPendingCount()).isEqualTo(2); | |||
assertThat(underTest.getInProgressCount()).isEqualTo(1); | |||
assertThat(underTest.getErrorCount()).isEqualTo(17); | |||
assertThat(underTest.getSuccessCount()).isEqualTo(80); | |||
} | |||
private List<Runnable> buildShuffleCallsToUnderTest() { | |||
List<Runnable> res = new ArrayList<>(); | |||
for (int i = 0; i < 100; i++) { | |||
res.add(new AddReceivedRunnable()); | |||
} | |||
for (int i = 0; i < 98; i++) { | |||
res.add(new AddInProgressRunnable()); | |||
} | |||
for (int i = 0; i < 80; i++) { | |||
res.add(new AddSuccessRunnable()); | |||
} | |||
for (int i = 0; i < 17; i++) { | |||
res.add(new AddErrorRunnable()); | |||
} | |||
Collections.shuffle(res); | |||
return res; | |||
} | |||
private class AddReceivedRunnable implements Runnable { | |||
@Override | |||
public void run() { | |||
underTest.addReceived(); | |||
} | |||
} | |||
private class AddInProgressRunnable implements Runnable { | |||
@Override | |||
public void run() { | |||
underTest.addInProgress(); | |||
} | |||
} | |||
private class AddErrorRunnable implements Runnable { | |||
@Override | |||
public void run() { | |||
underTest.addError(); | |||
} | |||
} | |||
private class AddSuccessRunnable implements Runnable { | |||
@Override | |||
public void run() { | |||
underTest.addSuccess(); | |||
} | |||
} | |||
} |
@@ -0,0 +1,175 @@ | |||
/* | |||
* SonarQube, open source software quality management tool. | |||
* Copyright (C) 2008-2014 SonarSource | |||
* mailto:contact AT sonarsource DOT com | |||
* | |||
* SonarQube is free software; you can redistribute it and/or | |||
* modify it under the terms of the GNU Lesser General Public | |||
* License as published by the Free Software Foundation; either | |||
* version 3 of the License, or (at your option) any later version. | |||
* | |||
* SonarQube is distributed in the hope that it will be useful, | |||
* but WITHOUT ANY WARRANTY; without even the implied warranty of | |||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |||
* Lesser General Public License for more details. | |||
* | |||
* You should have received a copy of the GNU Lesser General Public License | |||
* along with this program; if not, write to the Free Software Foundation, | |||
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. | |||
*/ | |||
package org.sonar.server.computation.monitoring; | |||
import java.util.Random; | |||
import org.junit.Rule; | |||
import org.junit.Test; | |||
import org.junit.rules.ExpectedException; | |||
import static org.assertj.core.api.Assertions.assertThat; | |||
public class CEQueueStatusImplTest { | |||
private static final String ISE_initPendingCount_CALL_MSG = "Method initPendingCount must be used before any other method and can not be called twice"; | |||
private static final int SOME_RANDOM_MAX = 96535; | |||
@Rule | |||
public ExpectedException expectedException = ExpectedException.none(); | |||
private CEQueueStatusImpl underTest = new CEQueueStatusImpl(); | |||
@Test | |||
public void verify_just_created_instance_metrics() { | |||
assertThat(underTest.getReceivedCount()).isEqualTo(0); | |||
assertThat(underTest.getPendingCount()).isEqualTo(0); | |||
assertThat(underTest.getInProgressCount()).isEqualTo(0); | |||
assertThat(underTest.getErrorCount()).isEqualTo(0); | |||
assertThat(underTest.getSuccessCount()).isEqualTo(0); | |||
} | |||
@Test | |||
public void initPendingCount_sets_value_of_pendingCount() { | |||
underTest.initPendingCount(10); | |||
assertThat(underTest.getPendingCount()).isEqualTo(10); | |||
} | |||
@Test | |||
public void initPendingCount_throws_ISE_if_called_twice() { | |||
expectISEForIllegalCallToInitPendingCount(); | |||
underTest.initPendingCount(10); | |||
underTest.initPendingCount(10); | |||
} | |||
@Test | |||
public void initPendingCount_throws_ISE_if_called_after_getPendingCount() { | |||
expectISEForIllegalCallToInitPendingCount(); | |||
underTest.getPendingCount(); | |||
underTest.initPendingCount(10); | |||
} | |||
@Test | |||
public void initPendingCount_throws_ISE_if_called_after_addReceived() { | |||
expectISEForIllegalCallToInitPendingCount(); | |||
underTest.addReceived(); | |||
underTest.initPendingCount(10); | |||
} | |||
@Test | |||
public void initPendingCount_throws_ISE_if_called_after_addInProgress() { | |||
expectISEForIllegalCallToInitPendingCount(); | |||
underTest.addInProgress(); | |||
underTest.initPendingCount(10); | |||
} | |||
private void expectISEForIllegalCallToInitPendingCount() { | |||
expectedException.expect(IllegalStateException.class); | |||
expectedException.expectMessage(ISE_initPendingCount_CALL_MSG); | |||
} | |||
@Test | |||
public void addReceived_sets_received_and_pending_counts_to_1_when_initPendingCount_has_not_been_called() { | |||
underTest.addReceived(); | |||
assertThat(underTest.getReceivedCount()).isEqualTo(1); | |||
assertThat(underTest.getPendingCount()).isEqualTo(1); | |||
} | |||
@Test | |||
public void addReceived_any_number_of_call_adds_1_per_call() { | |||
int calls = new Random().nextInt(SOME_RANDOM_MAX); | |||
for (int i = 0; i < calls; i++) { | |||
underTest.addReceived(); | |||
} | |||
assertThat(underTest.getReceivedCount()).isEqualTo(calls); | |||
assertThat(underTest.getPendingCount()).isEqualTo(calls); | |||
} | |||
@Test | |||
public void addInProgress_increases_InProgress_and_decreases_Pending_by_1_without_check_on_Pending() { | |||
underTest.addInProgress(); | |||
assertThat(underTest.getReceivedCount()).isEqualTo(0); | |||
assertThat(underTest.getPendingCount()).isEqualTo(-1); | |||
assertThat(underTest.getInProgressCount()).isEqualTo(1); | |||
assertThat(underTest.getErrorCount()).isEqualTo(0); | |||
assertThat(underTest.getSuccessCount()).isEqualTo(0); | |||
} | |||
@Test | |||
public void addInProgress_any_number_of_call_change_by_1_per_call() { | |||
int calls = new Random().nextInt(SOME_RANDOM_MAX); | |||
for (int i = 0; i < calls; i++) { | |||
underTest.addInProgress(); | |||
} | |||
assertThat(underTest.getInProgressCount()).isEqualTo(calls); | |||
assertThat(underTest.getPendingCount()).isEqualTo(-calls); | |||
} | |||
@Test | |||
public void addError_increases_Error_and_decreases_InProgress_by_1_without_check_on_InProgress() { | |||
underTest.addError(); | |||
assertThat(underTest.getReceivedCount()).isEqualTo(0); | |||
assertThat(underTest.getPendingCount()).isEqualTo(0); | |||
assertThat(underTest.getInProgressCount()).isEqualTo(-1); | |||
assertThat(underTest.getErrorCount()).isEqualTo(1); | |||
assertThat(underTest.getSuccessCount()).isEqualTo(0); | |||
} | |||
@Test | |||
public void addError_any_number_of_call_change_by_1_per_call() { | |||
int calls = new Random().nextInt(SOME_RANDOM_MAX); | |||
for (int i = 0; i < calls; i++) { | |||
underTest.addError(); | |||
} | |||
assertThat(underTest.getErrorCount()).isEqualTo(calls); | |||
assertThat(underTest.getInProgressCount()).isEqualTo(-calls); | |||
} | |||
@Test | |||
public void addSuccess_increases_Error_and_decreases_InProgress_by_1_without_check_on_InProgress() { | |||
underTest.addSuccess(); | |||
assertThat(underTest.getReceivedCount()).isEqualTo(0); | |||
assertThat(underTest.getPendingCount()).isEqualTo(0); | |||
assertThat(underTest.getInProgressCount()).isEqualTo(-1); | |||
assertThat(underTest.getErrorCount()).isEqualTo(0); | |||
assertThat(underTest.getSuccessCount()).isEqualTo(1); | |||
} | |||
@Test | |||
public void addSuccess_any_number_of_call_change_by_1_per_call() { | |||
int calls = new Random().nextInt(SOME_RANDOM_MAX); | |||
for (int i = 0; i < calls; i++) { | |||
underTest.addSuccess(); | |||
} | |||
assertThat(underTest.getSuccessCount()).isEqualTo(calls); | |||
assertThat(underTest.getInProgressCount()).isEqualTo(-calls); | |||
} | |||
} |
@@ -0,0 +1,120 @@ | |||
/* | |||
* SonarQube, open source software quality management tool. | |||
* Copyright (C) 2008-2014 SonarSource | |||
* mailto:contact AT sonarsource DOT com | |||
* | |||
* SonarQube is free software; you can redistribute it and/or | |||
* modify it under the terms of the GNU Lesser General Public | |||
* License as published by the Free Software Foundation; either | |||
* version 3 of the License, or (at your option) any later version. | |||
* | |||
* SonarQube is distributed in the hope that it will be useful, | |||
* but WITHOUT ANY WARRANTY; without even the implied warranty of | |||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |||
* Lesser General Public License for more details. | |||
* | |||
* You should have received a copy of the GNU Lesser General Public License | |||
* along with this program; if not, write to the Free Software Foundation, | |||
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. | |||
*/ | |||
package org.sonar.server.computation.monitoring; | |||
import org.junit.Test; | |||
import static org.assertj.core.api.Assertions.assertThat; | |||
import static org.assertj.core.api.Assertions.entry; | |||
public class ComputeEngineQueueMonitorTest { | |||
private static final long RECEIVED_COUNT = 30; | |||
private static final long PENDING_COUNT = 2; | |||
private static final long IN_PROGRESS_COUNT = 5; | |||
private static final long ERROR_COUNT = 10; | |||
private static final long SUCCESS_COUNT = 13; | |||
private ComputeEngineQueueMonitor underTest = new ComputeEngineQueueMonitor(new DumbCEQueueStatus()); | |||
@Test | |||
public void name_is_ComputeEngineQueue() { | |||
assertThat(underTest.name()).isEqualTo("ComputeEngineQueue"); | |||
} | |||
@Test | |||
public void attributes_has_entry_for_each_get_method() { | |||
assertThat(underTest.attributes()).containsOnly( | |||
entry("Received reports", RECEIVED_COUNT), | |||
entry("Pending reports", PENDING_COUNT), | |||
entry("In progress", IN_PROGRESS_COUNT), | |||
entry("Successfully processed", SUCCESS_COUNT), | |||
entry("Processed with error", ERROR_COUNT)); | |||
} | |||
@Test | |||
public void get_methods_delegate_to_the_CEQueueStatus_instance() { | |||
assertThat(underTest.getReceivedCount()).isEqualTo(RECEIVED_COUNT); | |||
assertThat(underTest.getPendingCount()).isEqualTo(PENDING_COUNT); | |||
assertThat(underTest.getInProgressCount()).isEqualTo(IN_PROGRESS_COUNT); | |||
assertThat(underTest.getErrorCount()).isEqualTo(ERROR_COUNT); | |||
assertThat(underTest.getSuccessCount()).isEqualTo(SUCCESS_COUNT); | |||
} | |||
/** | |||
* Dumb implementation of CEQueueStatus which returns constant values for get methods and throws UnsupportedOperationException | |||
* for other methods. | |||
*/ | |||
private static class DumbCEQueueStatus implements CEQueueStatus { | |||
@Override | |||
public long addReceived() { | |||
return methodNotImplemented(); | |||
} | |||
private long methodNotImplemented() { | |||
throw new UnsupportedOperationException("Not Implemented"); | |||
} | |||
@Override | |||
public long getReceivedCount() { | |||
return RECEIVED_COUNT; | |||
} | |||
@Override | |||
public long initPendingCount(long initialPendingCount) { | |||
return methodNotImplemented(); | |||
} | |||
@Override | |||
public long getPendingCount() { | |||
return PENDING_COUNT; | |||
} | |||
@Override | |||
public long addInProgress() { | |||
return methodNotImplemented(); | |||
} | |||
@Override | |||
public long getInProgressCount() { | |||
return IN_PROGRESS_COUNT; | |||
} | |||
@Override | |||
public long addError() { | |||
return methodNotImplemented(); | |||
} | |||
@Override | |||
public long getErrorCount() { | |||
return ERROR_COUNT; | |||
} | |||
@Override | |||
public long addSuccess() { | |||
return methodNotImplemented(); | |||
} | |||
@Override | |||
public long getSuccessCount() { | |||
return SUCCESS_COUNT; | |||
} | |||
} | |||
} |
@@ -25,6 +25,7 @@ import org.sonar.api.server.ws.WebService; | |||
import org.sonar.server.activity.index.ActivityIndex; | |||
import org.sonar.server.computation.ComputationThreadLauncher; | |||
import org.sonar.server.computation.ReportQueue; | |||
import org.sonar.server.computation.monitoring.CEQueueStatus; | |||
import org.sonar.server.user.UserSession; | |||
import org.sonar.server.ws.WsTester; | |||
@@ -35,7 +36,7 @@ public class ComputationWsTest { | |||
WsTester ws = new WsTester(new ComputationWs( | |||
new QueueAction(mock(ReportQueue.class)), | |||
new SubmitReportAction(mock(ReportQueue.class), mock(ComputationThreadLauncher.class), mock(UserSession.class)), | |||
new SubmitReportAction(mock(ReportQueue.class), mock(ComputationThreadLauncher.class), mock(UserSession.class), mock(CEQueueStatus.class)), | |||
new HistoryAction(mock(ActivityIndex.class), mock(UserSession.class)))); | |||
@Test |
@@ -20,20 +20,20 @@ | |||
package org.sonar.server.computation.ws; | |||
import java.io.InputStream; | |||
import org.junit.Before; | |||
import org.junit.Rule; | |||
import org.junit.Test; | |||
import org.sonar.api.server.ws.WebService; | |||
import org.sonar.db.compute.AnalysisReportDto; | |||
import org.sonar.core.permission.GlobalPermissions; | |||
import org.sonar.db.compute.AnalysisReportDto; | |||
import org.sonar.server.computation.ComputationThreadLauncher; | |||
import org.sonar.server.computation.ReportQueue; | |||
import org.sonar.server.computation.monitoring.CEQueueStatus; | |||
import org.sonar.server.exceptions.ForbiddenException; | |||
import org.sonar.server.tester.UserSessionRule; | |||
import org.sonar.server.ws.WsTester; | |||
import java.io.InputStream; | |||
import static org.assertj.core.api.Assertions.assertThat; | |||
import static org.mockito.Mockito.any; | |||
import static org.mockito.Mockito.eq; | |||
@@ -47,13 +47,14 @@ public class SubmitReportActionTest { | |||
public UserSessionRule userSessionRule = UserSessionRule.standalone(); | |||
ComputationThreadLauncher workerLauncher = mock(ComputationThreadLauncher.class); | |||
CEQueueStatus queueStatus = mock(CEQueueStatus.class); | |||
ReportQueue queue = mock(ReportQueue.class); | |||
WsTester wsTester; | |||
SubmitReportAction underTest; | |||
@Before | |||
public void before() { | |||
underTest = new SubmitReportAction(queue, workerLauncher, userSessionRule); | |||
underTest = new SubmitReportAction(queue, workerLauncher, userSessionRule, queueStatus); | |||
wsTester = new WsTester(new ComputationWs(underTest)); | |||
} | |||
@@ -85,6 +86,7 @@ public class SubmitReportActionTest { | |||
verify(queue).add(eq("P1"), eq("Project 1"), any(InputStream.class)); | |||
verify(workerLauncher).startAnalysisTaskNow(); | |||
verify(queueStatus).addReceived(); | |||
assertThat(response.outputAsString()).isEqualTo("{\"key\":\"42\"}"); | |||
} | |||
@@ -69,6 +69,10 @@ public class AnalysisReportDao implements Dao { | |||
return tryToPop(session, reportId); | |||
} | |||
public long countPending(DbSession session) { | |||
return mapper(session).selectAvailables(PENDING, WORKING).size(); | |||
} | |||
@VisibleForTesting | |||
AnalysisReportDto tryToPop(DbSession session, long reportId) { | |||
AnalysisReportMapper mapper = mapper(session); |
@@ -128,6 +128,13 @@ public class AnalysisReportDaoTest { | |||
assertThat(nextAvailableReport.getProjectKey()).isEqualTo("P2"); | |||
} | |||
@Test | |||
public void count_pending() { | |||
db.prepareDbUnit(getClass(), "pop_oldest_pending.xml"); | |||
assertThat(underTest.countPending(db.getSession())).isEqualTo(2); | |||
} | |||
@Test | |||
public void pop_null_if_no_pending_reports() { | |||
db.prepareDbUnit(getClass(), "pop_null_if_no_pending_reports.xml"); |