and they should not required the CEQueueStatusImpl object to be initialized either this way, monitoring can query the metrics of CEQueueStatus through JMX even before the object is initializedtags/5.3-RC1
@@ -21,21 +21,6 @@ package org.sonar.server.computation.monitoring; | |||
public interface CEQueueStatus { | |||
/** | |||
* Adds 1 to the count of received batch reports and 1 to the count of batch reports waiting for processing. | |||
* | |||
* @return the new count of received batch reports | |||
* | |||
* @see #getReceivedCount() | |||
* @see #getPendingCount() | |||
*/ | |||
long addReceived(); | |||
/** | |||
* Count of received batch reports since instance startup | |||
*/ | |||
long getReceivedCount(); | |||
/** | |||
* Sets the count of reports waiting for processing at startup. This method can be called only once. | |||
* | |||
@@ -49,9 +34,16 @@ public interface CEQueueStatus { | |||
long initPendingCount(long initialPendingCount); | |||
/** | |||
* Count of batch reports waiting for processing since startup, including reports received before instance startup. | |||
* Adds 1 to the count of received batch reports and 1 to the count of batch reports waiting for processing. | |||
* | |||
* @return the new count of received batch reports | |||
* | |||
* @see #getReceivedCount() | |||
* @see #getPendingCount() | |||
* | |||
* @throws IllegalStateException if {@link #initPendingCount(long)} has not been called yet | |||
*/ | |||
long getPendingCount(); | |||
long addReceived(); | |||
/** | |||
* Adds 1 to the count of batch reports under processing and removes 1 from the count of batch reports waiting for | |||
@@ -61,13 +53,25 @@ public interface CEQueueStatus { | |||
* | |||
* @see #getInProgressCount() | |||
* @see #getPendingCount() | |||
* | |||
* @throws IllegalStateException if {@link #initPendingCount(long)} has not been called yet | |||
*/ | |||
long addInProgress(); | |||
/** | |||
* Count of batch reports under processing. | |||
* Adds 1 to the count of batch reports which processing ended successfully and removes 1 from the count of batch | |||
* reports under processing. Adds the specified time to the processing time counter. | |||
* | |||
* @param processingTime duration of processing in ms | |||
* | |||
* @return the new count of batch reports which processing ended successfully | |||
* | |||
* @see #getSuccessCount() | |||
* @see #getInProgressCount() | |||
* | |||
* @throws IllegalArgumentException if processingTime is < 0 | |||
*/ | |||
long getInProgressCount(); | |||
long addSuccess(long processingTime); | |||
/** | |||
* Adds 1 to the count of batch reports which processing ended with an error and removes 1 from the count of batch | |||
@@ -85,24 +89,24 @@ public interface CEQueueStatus { | |||
long addError(long processingTime); | |||
/** | |||
* Count of batch reports which processing ended with an error since instance startup. | |||
* Count of received batch reports since instance startup | |||
*/ | |||
long getErrorCount(); | |||
long getReceivedCount(); | |||
/** | |||
* Adds 1 to the count of batch reports which processing ended successfully and removes 1 from the count of batch | |||
* reports under processing. Adds the specified time to the processing time counter. | |||
* | |||
* @param processingTime duration of processing in ms | |||
* | |||
* @return the new count of batch reports which processing ended successfully | |||
* | |||
* @see #getSuccessCount() | |||
* @see #getInProgressCount() | |||
* | |||
* @throws IllegalArgumentException if processingTime is < 0 | |||
* Count of batch reports waiting for processing since startup, including reports received before instance startup. | |||
*/ | |||
long addSuccess(long processingTime); | |||
long getPendingCount(); | |||
/** | |||
* Count of batch reports under processing. | |||
*/ | |||
long getInProgressCount(); | |||
/** | |||
* Count of batch reports which processing ended with an error since instance startup. | |||
*/ | |||
long getErrorCount(); | |||
/** | |||
* Count of batch reports which processing ended successfully since instance startup. |
@@ -22,6 +22,7 @@ package org.sonar.server.computation.monitoring; | |||
import java.util.concurrent.atomic.AtomicLong; | |||
import static com.google.common.base.Preconditions.checkArgument; | |||
import static com.google.common.base.Preconditions.checkState; | |||
public class CEQueueStatusImpl implements CEQueueStatus { | |||
private static final long PENDING_INITIAL_VALUE = Long.MIN_VALUE; | |||
@@ -33,51 +34,33 @@ public class CEQueueStatusImpl implements CEQueueStatus { | |||
private final AtomicLong success = new AtomicLong(0); | |||
private final AtomicLong processingTime = new AtomicLong(0); | |||
@Override | |||
public long addReceived() { | |||
// initPendingCount might not have been called yet | |||
if (!pending.compareAndSet(PENDING_INITIAL_VALUE, 1)) { | |||
pending.incrementAndGet(); | |||
} | |||
return received.incrementAndGet(); | |||
} | |||
@Override | |||
public long getReceivedCount() { | |||
return received.get(); | |||
} | |||
@Override | |||
public long initPendingCount(long initialPendingCount) { | |||
checkArgument(initialPendingCount >= 0, "Initial pending count must be >= 0"); | |||
if (!pending.compareAndSet(PENDING_INITIAL_VALUE, initialPendingCount)) { | |||
throw new IllegalStateException("Method initPendingCount must be used before any other method and can not be called twice"); | |||
} | |||
checkState( | |||
pending.compareAndSet(PENDING_INITIAL_VALUE, initialPendingCount), | |||
"Method initPendingCount must be used before any other method and can not be called twice"); | |||
return initialPendingCount; | |||
} | |||
@Override | |||
public long getPendingCount() { | |||
ensurePendingIsInitialized(); | |||
public long addReceived() { | |||
ensurePendingInitialized("addReceived"); | |||
return pending.get(); | |||
pending.incrementAndGet(); | |||
return received.incrementAndGet(); | |||
} | |||
@Override | |||
public long addInProgress() { | |||
ensurePendingIsInitialized(); | |||
ensurePendingInitialized("addInProgress"); | |||
pending.decrementAndGet(); | |||
return inProgress.incrementAndGet(); | |||
} | |||
private void ensurePendingIsInitialized() { | |||
pending.compareAndSet(PENDING_INITIAL_VALUE, 0); | |||
} | |||
@Override | |||
public long getInProgressCount() { | |||
return inProgress.get(); | |||
private void ensurePendingInitialized(String methodName) { | |||
checkState(pending.get() != PENDING_INITIAL_VALUE, "Method initPendingCount must be used before %s can be called", methodName); | |||
} | |||
@Override | |||
@@ -87,11 +70,6 @@ public class CEQueueStatusImpl implements CEQueueStatus { | |||
return error.incrementAndGet(); | |||
} | |||
@Override | |||
public long getErrorCount() { | |||
return error.get(); | |||
} | |||
@Override | |||
public long addSuccess(long processingTime) { | |||
addProcessingTime(processingTime); | |||
@@ -104,6 +82,27 @@ public class CEQueueStatusImpl implements CEQueueStatus { | |||
processingTime.addAndGet(time); | |||
} | |||
@Override | |||
public long getReceivedCount() { | |||
return received.get(); | |||
} | |||
@Override | |||
public long getPendingCount() { | |||
long currentValue = pending.get(); | |||
return currentValue == PENDING_INITIAL_VALUE ? 0 : currentValue; | |||
} | |||
@Override | |||
public long getInProgressCount() { | |||
return inProgress.get(); | |||
} | |||
@Override | |||
public long getErrorCount() { | |||
return error.get(); | |||
} | |||
@Override | |||
public long getSuccessCount() { | |||
return success.get(); |
@@ -49,6 +49,9 @@ public class CEQueueStatusImplConcurrentTest { | |||
@Test | |||
public void test_concurrent_modifications_in_any_order() throws InterruptedException { | |||
long initialPendingCount = 9963L; | |||
underTest.initPendingCount(initialPendingCount); | |||
for (Runnable runnable : buildShuffleCallsToUnderTest()) { | |||
executorService.submit(runnable); | |||
} | |||
@@ -56,7 +59,7 @@ public class CEQueueStatusImplConcurrentTest { | |||
executorService.awaitTermination(1, TimeUnit.SECONDS); | |||
assertThat(underTest.getReceivedCount()).isEqualTo(100); | |||
assertThat(underTest.getPendingCount()).isEqualTo(2); | |||
assertThat(underTest.getPendingCount()).isEqualTo(initialPendingCount + 2); | |||
assertThat(underTest.getInProgressCount()).isEqualTo(1); | |||
assertThat(underTest.getErrorCount()).isEqualTo(17); | |||
assertThat(underTest.getSuccessCount()).isEqualTo(80); |
@@ -27,9 +27,9 @@ import org.junit.rules.ExpectedException; | |||
import static org.assertj.core.api.Assertions.assertThat; | |||
public class CEQueueStatusImplTest { | |||
private static final String ISE_initPendingCount_CALL_MSG = "Method initPendingCount must be used before any other method and can not be called twice"; | |||
private static final int SOME_RANDOM_MAX = 96535; | |||
private static final int SOME_PROCESSING_TIME = 8723; | |||
private static final long INITIAL_PENDING_COUNT = 996L; | |||
@Rule | |||
public ExpectedException expectedException = ExpectedException.none(); | |||
@@ -55,66 +55,60 @@ public class CEQueueStatusImplTest { | |||
@Test | |||
public void initPendingCount_throws_ISE_if_called_twice() { | |||
expectISEForIllegalCallToInitPendingCount(); | |||
expectedException.expect(IllegalStateException.class); | |||
expectedException.expectMessage("Method initPendingCount must be used before any other method and can not be called twice"); | |||
underTest.initPendingCount(10); | |||
underTest.initPendingCount(10); | |||
} | |||
@Test | |||
public void initPendingCount_throws_ISE_if_called_after_getPendingCount() { | |||
expectISEForIllegalCallToInitPendingCount(); | |||
underTest.getPendingCount(); | |||
underTest.initPendingCount(10); | |||
} | |||
@Test | |||
public void initPendingCount_throws_ISE_if_called_after_addReceived() { | |||
expectISEForIllegalCallToInitPendingCount(); | |||
public void addReceived_throws_ISE_if_called_before_initPendingCount() { | |||
expectedException.expect(IllegalStateException.class); | |||
expectedException.expectMessage("Method initPendingCount must be used before addReceived can be called"); | |||
underTest.addReceived(); | |||
underTest.initPendingCount(10); | |||
} | |||
@Test | |||
public void initPendingCount_throws_ISE_if_called_after_addInProgress() { | |||
expectISEForIllegalCallToInitPendingCount(); | |||
underTest.addInProgress(); | |||
underTest.initPendingCount(10); | |||
} | |||
private void expectISEForIllegalCallToInitPendingCount() { | |||
expectedException.expect(IllegalStateException.class); | |||
expectedException.expectMessage(ISE_initPendingCount_CALL_MSG); | |||
} | |||
@Test | |||
public void addReceived_sets_received_and_pending_counts_to_1_when_initPendingCount_has_not_been_called() { | |||
underTest.initPendingCount(INITIAL_PENDING_COUNT); | |||
underTest.addReceived(); | |||
assertThat(underTest.getReceivedCount()).isEqualTo(1); | |||
assertThat(underTest.getPendingCount()).isEqualTo(1); | |||
assertThat(underTest.getPendingCount()).isEqualTo(INITIAL_PENDING_COUNT + 1); | |||
} | |||
@Test | |||
public void addReceived_any_number_of_call_adds_1_per_call() { | |||
underTest.initPendingCount(INITIAL_PENDING_COUNT); | |||
int calls = new Random().nextInt(SOME_RANDOM_MAX); | |||
for (int i = 0; i < calls; i++) { | |||
underTest.addReceived(); | |||
} | |||
assertThat(underTest.getReceivedCount()).isEqualTo(calls); | |||
assertThat(underTest.getPendingCount()).isEqualTo(calls); | |||
assertThat(underTest.getPendingCount()).isEqualTo(INITIAL_PENDING_COUNT + calls); | |||
} | |||
@Test | |||
public void addInProgress_throws_ISE_if_called_before_initPendingCount() { | |||
expectedException.expect(IllegalStateException.class); | |||
expectedException.expectMessage("Method initPendingCount must be used before addInProgress can be called"); | |||
underTest.addInProgress(); | |||
} | |||
@Test | |||
public void addInProgress_increases_InProgress_and_decreases_Pending_by_1_without_check_on_Pending() { | |||
underTest.initPendingCount(INITIAL_PENDING_COUNT); | |||
underTest.addInProgress(); | |||
assertThat(underTest.getReceivedCount()).isEqualTo(0); | |||
assertThat(underTest.getPendingCount()).isEqualTo(-1); | |||
assertThat(underTest.getPendingCount()).isEqualTo(INITIAL_PENDING_COUNT - 1); | |||
assertThat(underTest.getInProgressCount()).isEqualTo(1); | |||
assertThat(underTest.getErrorCount()).isEqualTo(0); | |||
assertThat(underTest.getSuccessCount()).isEqualTo(0); | |||
@@ -123,13 +117,15 @@ public class CEQueueStatusImplTest { | |||
@Test | |||
public void addInProgress_any_number_of_call_change_by_1_per_call() { | |||
underTest.initPendingCount(INITIAL_PENDING_COUNT); | |||
int calls = new Random().nextInt(SOME_RANDOM_MAX); | |||
for (int i = 0; i < calls; i++) { | |||
underTest.addInProgress(); | |||
} | |||
assertThat(underTest.getInProgressCount()).isEqualTo(calls); | |||
assertThat(underTest.getPendingCount()).isEqualTo(-calls); | |||
assertThat(underTest.getPendingCount()).isEqualTo(INITIAL_PENDING_COUNT - calls); | |||
assertThat(underTest.getProcessingTime()).isEqualTo(0); | |||
} | |||
@@ -20,6 +20,7 @@ | |||
package org.sonar.server.computation.queue; | |||
import com.google.common.base.Optional; | |||
import org.junit.Before; | |||
import org.junit.Rule; | |||
import org.junit.Test; | |||
import org.junit.rules.ExpectedException; | |||
@@ -58,6 +59,11 @@ public class CeQueueImplTest { | |||
CeQueueListener listener = mock(CeQueueListener.class); | |||
CeQueue underTest = new CeQueueImpl(system2, dbTester.getDbClient(), uuidFactory, queueStatus, new CeQueueListener[] {listener}); | |||
@Before | |||
public void setUp() throws Exception { | |||
queueStatus.initPendingCount(0); | |||
} | |||
@Test | |||
public void test_submit() { | |||
CeTaskSubmit.Builder submission = underTest.prepareSubmit(); |