@@ -69,15 +69,20 @@ public class ComputationService { | |||
timingSum += stepProfiler.stopInfo(step.getDescription()); | |||
} | |||
item.dto.setStatus(SUCCESS); | |||
queueStatus.addSuccess(); | |||
long timing = logProcessingEnd(message, profiler, timingSum); | |||
queueStatus.addSuccess(timing); | |||
} catch (Throwable e) { | |||
item.dto.setStatus(FAILED); | |||
queueStatus.addError(); | |||
long timing = logProcessingEnd(message, profiler, timingSum); | |||
queueStatus.addError(timing); | |||
throw Throwables.propagate(e); | |||
} finally { | |||
item.dto.setFinishedAt(system.now()); | |||
activityManager.saveActivity(item.dto); | |||
profiler.stopInfo(format("%s total time spent in steps=%sms", message, timingSum)); | |||
} | |||
} | |||
private static long logProcessingEnd(String message, Profiler profiler, long timingSum) { | |||
return profiler.stopInfo(format("%s total time spent in steps=%sms", message, timingSum)); | |||
} | |||
} |
@@ -71,14 +71,18 @@ public interface CEQueueStatus { | |||
/** | |||
* Adds 1 to the count of batch reports which processing ended with an error and removes 1 from the count of batch | |||
* reports under processing. | |||
* reports under processing. Adds the specified time to the processing time counter. | |||
* | |||
* @param processingTime duration of processing in ms | |||
* | |||
* @return the new count of batch reports which processing ended with an error | |||
* | |||
* @see #getErrorCount() | |||
* @see #getInProgressCount() | |||
* | |||
* @throws IllegalArgumentException if processingTime is < 0 | |||
*/ | |||
long addError(); | |||
long addError(long processingTime); | |||
/** | |||
* Count of batch reports which processing ended with an error since instance startup. | |||
@@ -87,17 +91,26 @@ public interface CEQueueStatus { | |||
/** | |||
* Adds 1 to the count of batch reports which processing ended successfully and removes 1 from the count of batch | |||
* reports under processing. | |||
* reports under processing. Adds the specified time to the processing time counter. | |||
* | |||
* @param processingTime duration of processing in ms | |||
* | |||
* @return the new count of batch reports which processing ended successfully | |||
* | |||
* @see #getSuccessCount() | |||
* @see #getInProgressCount() | |||
* | |||
* @throws IllegalArgumentException if processingTime is < 0 | |||
*/ | |||
long addSuccess(); | |||
long addSuccess(long processingTime); | |||
/** | |||
* Count of batch reports which processing ended successfully since instance startup. | |||
*/ | |||
long getSuccessCount(); | |||
/** | |||
* Time spent processing batch reports since startup. | |||
*/ | |||
long getProcessingTime(); | |||
} |
@@ -31,6 +31,7 @@ public class CEQueueStatusImpl implements CEQueueStatus { | |||
private final AtomicLong inProgress = new AtomicLong(0); | |||
private final AtomicLong error = new AtomicLong(0); | |||
private final AtomicLong success = new AtomicLong(0); | |||
private final AtomicLong processingTime = new AtomicLong(0); | |||
@Override | |||
public long addReceived() { | |||
@@ -80,7 +81,8 @@ public class CEQueueStatusImpl implements CEQueueStatus { | |||
} | |||
@Override | |||
public long addError() { | |||
public long addError(long processingTime) { | |||
addProcessingTime(processingTime); | |||
inProgress.decrementAndGet(); | |||
return error.incrementAndGet(); | |||
} | |||
@@ -91,13 +93,24 @@ public class CEQueueStatusImpl implements CEQueueStatus { | |||
} | |||
@Override | |||
public long addSuccess() { | |||
public long addSuccess(long processingTime) { | |||
addProcessingTime(processingTime); | |||
inProgress.decrementAndGet(); | |||
return success.incrementAndGet(); | |||
} | |||
private void addProcessingTime(long time) { | |||
checkArgument(time >= 0, "Processing time can not be < 0"); | |||
processingTime.addAndGet(time); | |||
} | |||
@Override | |||
public long getSuccessCount() { | |||
return success.get(); | |||
} | |||
@Override | |||
public long getProcessingTime() { | |||
return processingTime.get(); | |||
} | |||
} |
@@ -47,6 +47,7 @@ public class ComputeEngineQueueMonitor extends BaseMonitorMBean implements Compu | |||
attributes.put("In progress", getInProgressCount()); | |||
attributes.put("Successfully processed", getSuccessCount()); | |||
attributes.put("Processed with error", getErrorCount()); | |||
attributes.put("Processing time", getProcessingTime()); | |||
return attributes; | |||
} | |||
@@ -74,4 +75,9 @@ public class ComputeEngineQueueMonitor extends BaseMonitorMBean implements Compu | |||
public long getSuccessCount() { | |||
return queueStatus.getSuccessCount(); | |||
} | |||
@Override | |||
public long getProcessingTime() { | |||
return queueStatus.getProcessingTime(); | |||
} | |||
} |
@@ -27,22 +27,27 @@ public interface ComputeEngineQueueMonitorMBean { | |||
long getReceivedCount(); | |||
/** | |||
* Count of batch report waiting for processing since startup, including reports received before instance startup. | |||
* Count of batch reports waiting for processing since startup, including reports received before instance startup. | |||
*/ | |||
long getPendingCount(); | |||
/** | |||
* Count of batch report under processing. | |||
* Count of batch reports under processing. | |||
*/ | |||
long getInProgressCount(); | |||
/** | |||
* Count of batch report which processing ended with an error since instance startup. | |||
* Count of batch reports which processing ended with an error since instance startup. | |||
*/ | |||
long getErrorCount(); | |||
/** | |||
* Count of batch report which processing ended successfully since instance startup. | |||
* Count of batch reports which processing ended successfully since instance startup. | |||
*/ | |||
long getSuccessCount(); | |||
/** | |||
* Time spent processing reports since startup. | |||
*/ | |||
long getProcessingTime(); | |||
} |
@@ -39,6 +39,7 @@ import org.sonar.server.computation.step.ComputationSteps; | |||
import static org.assertj.core.api.Assertions.assertThat; | |||
import static org.junit.Assert.fail; | |||
import static org.mockito.Matchers.anyLong; | |||
import static org.mockito.Mockito.doThrow; | |||
import static org.mockito.Mockito.mock; | |||
import static org.mockito.Mockito.verify; | |||
@@ -87,7 +88,7 @@ public class ComputationServiceTest { | |||
verify(activityManager).saveActivity(dto); | |||
verify(queueStatus).addInProgress(); | |||
verify(queueStatus).addSuccess(); | |||
verify(queueStatus).addSuccess(anyLong()); | |||
verifyNoMoreInteractions(queueStatus); | |||
} | |||
@@ -116,7 +117,7 @@ public class ComputationServiceTest { | |||
assertThat(dto.getFinishedAt()).isNotNull(); | |||
verify(queueStatus).addInProgress(); | |||
verify(queueStatus).addError(); | |||
verify(queueStatus).addError(anyLong()); | |||
verifyNoMoreInteractions(queueStatus); | |||
} | |||
} | |||
@@ -135,7 +136,7 @@ public class ComputationServiceTest { | |||
assertThat(dto.getFinishedAt()).isNotNull(); | |||
verify(queueStatus).addInProgress(); | |||
verify(queueStatus).addError(); | |||
verify(queueStatus).addError(anyLong()); | |||
verifyNoMoreInteractions(queueStatus); | |||
} | |||
} |
@@ -60,6 +60,7 @@ public class CEQueueStatusImplConcurrentTest { | |||
assertThat(underTest.getInProgressCount()).isEqualTo(1); | |||
assertThat(underTest.getErrorCount()).isEqualTo(17); | |||
assertThat(underTest.getSuccessCount()).isEqualTo(80); | |||
assertThat(underTest.getProcessingTime()).isEqualTo(177); | |||
} | |||
private List<Runnable> buildShuffleCallsToUnderTest() { | |||
@@ -97,14 +98,14 @@ public class CEQueueStatusImplConcurrentTest { | |||
private class AddErrorRunnable implements Runnable { | |||
@Override | |||
public void run() { | |||
underTest.addError(); | |||
underTest.addError(1); | |||
} | |||
} | |||
private class AddSuccessRunnable implements Runnable { | |||
@Override | |||
public void run() { | |||
underTest.addSuccess(); | |||
underTest.addSuccess(2); | |||
} | |||
} | |||
} |
@@ -29,6 +29,7 @@ import static org.assertj.core.api.Assertions.assertThat; | |||
public class CEQueueStatusImplTest { | |||
private static final String ISE_initPendingCount_CALL_MSG = "Method initPendingCount must be used before any other method and can not be called twice"; | |||
private static final int SOME_RANDOM_MAX = 96535; | |||
private static final int SOME_PROCESSING_TIME = 8723; | |||
@Rule | |||
public ExpectedException expectedException = ExpectedException.none(); | |||
@@ -42,6 +43,7 @@ public class CEQueueStatusImplTest { | |||
assertThat(underTest.getInProgressCount()).isEqualTo(0); | |||
assertThat(underTest.getErrorCount()).isEqualTo(0); | |||
assertThat(underTest.getSuccessCount()).isEqualTo(0); | |||
assertThat(underTest.getProcessingTime()).isEqualTo(0); | |||
} | |||
@Test | |||
@@ -116,6 +118,7 @@ public class CEQueueStatusImplTest { | |||
assertThat(underTest.getInProgressCount()).isEqualTo(1); | |||
assertThat(underTest.getErrorCount()).isEqualTo(0); | |||
assertThat(underTest.getSuccessCount()).isEqualTo(0); | |||
assertThat(underTest.getProcessingTime()).isEqualTo(0); | |||
} | |||
@Test | |||
@@ -127,49 +130,70 @@ public class CEQueueStatusImplTest { | |||
assertThat(underTest.getInProgressCount()).isEqualTo(calls); | |||
assertThat(underTest.getPendingCount()).isEqualTo(-calls); | |||
assertThat(underTest.getProcessingTime()).isEqualTo(0); | |||
} | |||
@Test | |||
public void addError_throws_IAE_if_time_is_less_than_0() { | |||
expectedException.expect(IllegalArgumentException.class); | |||
expectedException.expectMessage("Processing time can not be < 0"); | |||
underTest.addError(-1); | |||
} | |||
@Test | |||
public void addError_increases_Error_and_decreases_InProgress_by_1_without_check_on_InProgress() { | |||
underTest.addError(); | |||
underTest.addError(SOME_PROCESSING_TIME); | |||
assertThat(underTest.getReceivedCount()).isEqualTo(0); | |||
assertThat(underTest.getPendingCount()).isEqualTo(0); | |||
assertThat(underTest.getInProgressCount()).isEqualTo(-1); | |||
assertThat(underTest.getErrorCount()).isEqualTo(1); | |||
assertThat(underTest.getSuccessCount()).isEqualTo(0); | |||
assertThat(underTest.getProcessingTime()).isEqualTo(SOME_PROCESSING_TIME); | |||
} | |||
@Test | |||
public void addError_any_number_of_call_change_by_1_per_call() { | |||
int calls = new Random().nextInt(SOME_RANDOM_MAX); | |||
for (int i = 0; i < calls; i++) { | |||
underTest.addError(); | |||
underTest.addError(1); | |||
} | |||
assertThat(underTest.getErrorCount()).isEqualTo(calls); | |||
assertThat(underTest.getInProgressCount()).isEqualTo(-calls); | |||
assertThat(underTest.getProcessingTime()).isEqualTo(calls); | |||
} | |||
@Test | |||
public void addSuccess_throws_IAE_if_time_is_less_than_0() { | |||
expectedException.expect(IllegalArgumentException.class); | |||
expectedException.expectMessage("Processing time can not be < 0"); | |||
underTest.addSuccess(-1); | |||
} | |||
@Test | |||
public void addSuccess_increases_Error_and_decreases_InProgress_by_1_without_check_on_InProgress() { | |||
underTest.addSuccess(); | |||
underTest.addSuccess(SOME_PROCESSING_TIME); | |||
assertThat(underTest.getReceivedCount()).isEqualTo(0); | |||
assertThat(underTest.getPendingCount()).isEqualTo(0); | |||
assertThat(underTest.getInProgressCount()).isEqualTo(-1); | |||
assertThat(underTest.getErrorCount()).isEqualTo(0); | |||
assertThat(underTest.getSuccessCount()).isEqualTo(1); | |||
assertThat(underTest.getProcessingTime()).isEqualTo(SOME_PROCESSING_TIME); | |||
} | |||
@Test | |||
public void addSuccess_any_number_of_call_change_by_1_per_call() { | |||
int calls = new Random().nextInt(SOME_RANDOM_MAX); | |||
for (int i = 0; i < calls; i++) { | |||
underTest.addSuccess(); | |||
underTest.addSuccess(1); | |||
} | |||
assertThat(underTest.getSuccessCount()).isEqualTo(calls); | |||
assertThat(underTest.getInProgressCount()).isEqualTo(-calls); | |||
assertThat(underTest.getProcessingTime()).isEqualTo(calls); | |||
} | |||
} |
@@ -32,6 +32,7 @@ public class ComputeEngineQueueMonitorTest { | |||
private static final long IN_PROGRESS_COUNT = 5; | |||
private static final long ERROR_COUNT = 10; | |||
private static final long SUCCESS_COUNT = 13; | |||
private static final long PROCESSING_TIME = 987; | |||
private ComputeEngineQueueMonitor underTest = new ComputeEngineQueueMonitor(new DumbCEQueueStatus(), mock(ReportQueue.class)); | |||
@@ -43,11 +44,12 @@ public class ComputeEngineQueueMonitorTest { | |||
@Test | |||
public void attributes_has_entry_for_each_get_method() { | |||
assertThat(underTest.attributes()).containsOnly( | |||
entry("Received reports", RECEIVED_COUNT), | |||
entry("Pending reports", PENDING_COUNT), | |||
entry("Received", RECEIVED_COUNT), | |||
entry("Pending", PENDING_COUNT), | |||
entry("In progress", IN_PROGRESS_COUNT), | |||
entry("Successfully processed", SUCCESS_COUNT), | |||
entry("Processed with error", ERROR_COUNT)); | |||
entry("Processed with error", ERROR_COUNT), | |||
entry("Processing time", PROCESSING_TIME)); | |||
} | |||
@Test | |||
@@ -57,6 +59,7 @@ public class ComputeEngineQueueMonitorTest { | |||
assertThat(underTest.getInProgressCount()).isEqualTo(IN_PROGRESS_COUNT); | |||
assertThat(underTest.getErrorCount()).isEqualTo(ERROR_COUNT); | |||
assertThat(underTest.getSuccessCount()).isEqualTo(SUCCESS_COUNT); | |||
assertThat(underTest.getProcessingTime()).isEqualTo(PROCESSING_TIME); | |||
} | |||
/** | |||
@@ -100,7 +103,7 @@ public class ComputeEngineQueueMonitorTest { | |||
} | |||
@Override | |||
public long addError() { | |||
public long addError(long processingTime) { | |||
return methodNotImplemented(); | |||
} | |||
@@ -110,7 +113,7 @@ public class ComputeEngineQueueMonitorTest { | |||
} | |||
@Override | |||
public long addSuccess() { | |||
public long addSuccess(long processingTime) { | |||
return methodNotImplemented(); | |||
} | |||
@@ -118,5 +121,10 @@ public class ComputeEngineQueueMonitorTest { | |||
public long getSuccessCount() { | |||
return SUCCESS_COUNT; | |||
} | |||
@Override | |||
public long getProcessingTime() { | |||
return PROCESSING_TIME; | |||
} | |||
} | |||
} |