import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Random;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.Mockito;
import org.sonar.api.utils.System2;
+import org.sonar.api.utils.internal.AlwaysIncreasingSystem2;
import org.sonar.api.utils.internal.TestSystem2;
import org.sonar.db.DbTester;
private static final String WORKER_UUID_1 = "worker uuid 1";
private static final String WORKER_UUID_2 = "worker uuid 2";
private static final int EXECUTION_COUNT = 42;
+ private static final int MAX_EXECUTION_COUNT = 2;
private TestSystem2 system2 = new TestSystem2().setNow(INIT_TIME);
private CeQueueDao underTest = new CeQueueDao(system2);
private CeQueueDao underTestWithSystem2Mock = new CeQueueDao(mockedSystem2);
+ private CeQueueDao underTestAlwaysIncreasingSystem2 = new CeQueueDao(new AlwaysIncreasingSystem2());
@Test
public void insert_populates_createdAt_and_updateAt_from_System2_with_same_value_if_any_is_not_set() {
@Test
public void resetAllToPendingStatus_resets_startedAt() {
assertThat(insert(TASK_UUID_1, COMPONENT_UUID_1, PENDING).getStartedAt()).isNull();
- assertThat(underTest.peek(db.getSession(), WORKER_UUID_1).get().getUuid()).isEqualTo(TASK_UUID_1);
+ assertThat(underTest.peek(db.getSession(), WORKER_UUID_1, MAX_EXECUTION_COUNT).get().getUuid()).isEqualTo(TASK_UUID_1);
assertThat(underTest.selectByUuid(db.getSession(), TASK_UUID_1).get().getStartedAt()).isNotNull();
underTest.resetAllToPendingStatus(db.getSession());
@Test
public void peek_none_if_no_pendings() throws Exception {
- assertThat(underTest.peek(db.getSession(), WORKER_UUID_1).isPresent()).isFalse();
+ assertThat(underTest.peek(db.getSession(), WORKER_UUID_1, MAX_EXECUTION_COUNT).isPresent()).isFalse();
// not pending, but in progress
insert(TASK_UUID_1, COMPONENT_UUID_1, IN_PROGRESS);
- assertThat(underTest.peek(db.getSession(), WORKER_UUID_1).isPresent()).isFalse();
+ assertThat(underTest.peek(db.getSession(), WORKER_UUID_1, MAX_EXECUTION_COUNT).isPresent()).isFalse();
}
@Test
verifyCeQueueStatuses(TASK_UUID_1, PENDING, TASK_UUID_2, PENDING);
// peek first one
- Optional<CeQueueDto> peek = underTest.peek(db.getSession(), WORKER_UUID_1);
+ Optional<CeQueueDto> peek = underTest.peek(db.getSession(), WORKER_UUID_1, MAX_EXECUTION_COUNT);
assertThat(peek.isPresent()).isTrue();
assertThat(peek.get().getUuid()).isEqualTo(TASK_UUID_1);
assertThat(peek.get().getStatus()).isEqualTo(IN_PROGRESS);
verifyCeQueueStatuses(TASK_UUID_1, IN_PROGRESS, TASK_UUID_2, PENDING);
// peek second one
- peek = underTest.peek(db.getSession(), WORKER_UUID_2);
+ peek = underTest.peek(db.getSession(), WORKER_UUID_2, MAX_EXECUTION_COUNT);
assertThat(peek.isPresent()).isTrue();
assertThat(peek.get().getUuid()).isEqualTo(TASK_UUID_2);
assertThat(peek.get().getStatus()).isEqualTo(IN_PROGRESS);
verifyCeQueueStatuses(TASK_UUID_1, IN_PROGRESS, TASK_UUID_2, IN_PROGRESS);
// no more pendings
- assertThat(underTest.peek(db.getSession(), WORKER_UUID_1).isPresent()).isFalse();
+ assertThat(underTest.peek(db.getSession(), WORKER_UUID_1, MAX_EXECUTION_COUNT).isPresent()).isFalse();
}
@Test
system2.setNow(INIT_TIME + 3_000_000);
insert(TASK_UUID_2, COMPONENT_UUID_1, PENDING);
- Optional<CeQueueDto> peek = underTest.peek(db.getSession(), WORKER_UUID_1);
+ Optional<CeQueueDto> peek = underTest.peek(db.getSession(), WORKER_UUID_1, MAX_EXECUTION_COUNT);
assertThat(peek.isPresent()).isTrue();
assertThat(peek.get().getUuid()).isEqualTo(TASK_UUID_1);
assertThat(peek.get().getWorkerUuid()).isEqualTo(WORKER_UUID_1);
verifyCeQueueStatuses(TASK_UUID_1, IN_PROGRESS, TASK_UUID_2, PENDING);
// do not peek second task as long as the first one is in progress
- peek = underTest.peek(db.getSession(), WORKER_UUID_1);
+ peek = underTest.peek(db.getSession(), WORKER_UUID_1, MAX_EXECUTION_COUNT);
assertThat(peek.isPresent()).isFalse();
// first one is finished
underTest.deleteByUuid(db.getSession(), TASK_UUID_1);
- peek = underTest.peek(db.getSession(), WORKER_UUID_2);
+ peek = underTest.peek(db.getSession(), WORKER_UUID_2, MAX_EXECUTION_COUNT);
assertThat(peek.get().getUuid()).isEqualTo(TASK_UUID_2);
assertThat(peek.get().getWorkerUuid()).isEqualTo(WORKER_UUID_2);
assertThat(peek.get().getExecutionCount()).isEqualTo(1);
}
+ @Test
+ public void peek_ignores_rows_with_executionCount_greater_or_equal_to_specified_maxExecutionCount_0() {
+ peek_ignores_rows_with_executionCount_greater_or_equal_to_specified_maxExecutionCount(0, null);
+ }
+
+ @Test
+ public void peek_ignores_rows_with_executionCount_greater_or_equal_to_specified_maxExecutionCount_1() {
+ peek_ignores_rows_with_executionCount_greater_or_equal_to_specified_maxExecutionCount(1, "u0");
+ }
+
+ @Test
+ public void peek_ignores_rows_with_executionCount_greater_or_equal_to_specified_maxExecutionCount_2() {
+ peek_ignores_rows_with_executionCount_greater_or_equal_to_specified_maxExecutionCount(2, "u1");
+ }
+
+ @Test
+ public void peek_ignores_rows_with_executionCount_greater_or_equal_to_specified_maxExecutionCount_3() {
+ peek_ignores_rows_with_executionCount_greater_or_equal_to_specified_maxExecutionCount(3, "u2");
+ }
+
+ @Test
+ public void peek_ignores_rows_with_executionCount_greater_or_equal_to_specified_maxExecutionCount_4() {
+ peek_ignores_rows_with_executionCount_greater_or_equal_to_specified_maxExecutionCount(4, "u3");
+ }
+
+ @Test
+ public void peek_ignores_rows_with_executionCount_greater_or_equal_to_specified_maxExecutionCount_more_then_4() {
+ peek_ignores_rows_with_executionCount_greater_or_equal_to_specified_maxExecutionCount(4 + Math.abs(new Random().nextInt(100)), "u3");
+ }
+
+ private void peek_ignores_rows_with_executionCount_greater_or_equal_to_specified_maxExecutionCount(int maxExecutionCount, @Nullable String expected) {
+ insert("u3", CeQueueDto.Status.PENDING, 3);
+ insert("u2", CeQueueDto.Status.PENDING, 2);
+ insert("u1", CeQueueDto.Status.PENDING, 1);
+ insert("u0", CeQueueDto.Status.PENDING, 0);
+
+ Optional<CeQueueDto> dto = underTest.peek(db.getSession(), WORKER_UUID_1, maxExecutionCount);
+ if (expected == null) {
+ assertThat(dto.isPresent()).isFalse();
+ } else {
+ assertThat(dto.get().getUuid()).isEqualTo(expected);
+ }
+ }
+
@Test
public void select_by_query() {
// task status not in query
db.commit();
}
+ private CeQueueDto insert(String uuid, CeQueueDto.Status status, int executionCount) {
+ CeQueueDto dto = new CeQueueDto();
+ dto.setUuid(uuid);
+ dto.setTaskType(CeTaskTypes.REPORT);
+ dto.setStatus(status);
+ dto.setSubmitterLogin("henri");
+ dto.setExecutionCount(executionCount);
+ underTestAlwaysIncreasingSystem2.insert(db.getSession(), dto);
+ db.getSession().commit();
+ return dto;
+ }
+
private CeQueueDto insert(String uuid, String componentUuid, CeQueueDto.Status status) {
CeQueueDto dto = new CeQueueDto();
dto.setUuid(uuid);
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.util.List;
+import java.util.Random;
import javax.annotation.Nullable;
import org.junit.Rule;
import org.junit.Test;
}
@Test
- public void peek_increases_executionCount_and_override_workerUuid_to_argument() {
+ public void peek_overrides_workerUuid_to_argument() {
dbTester.getDbClient().ceQueueDao().insert(session, new CeQueueDto()
.setUuid("uuid")
.setTaskType("foo")
.setStatus(CeQueueDto.Status.PENDING)
- .setWorkerUuid("must be overriden")
- .setExecutionCount(2));
+ .setWorkerUuid("must be overriden"));
dbTester.commit();
underTest.peek(WORKER_UUID_1);
CeQueueDto ceQueueDto = dbTester.getDbClient().ceQueueDao().selectByUuid(session, "uuid").get();
assertThat(ceQueueDto.getWorkerUuid()).isEqualTo(WORKER_UUID_1);
- assertThat(ceQueueDto.getExecutionCount()).isEqualTo(3);
}
@Test
assertThat(peek.isPresent()).isFalse();
}
+ @Test
+ public void peek_peeks_pending_tasks_with_executionCount_equal_to_0_and_increases_it() {
+ dbTester.getDbClient().ceQueueDao().insert(session, new CeQueueDto()
+ .setUuid("uuid")
+ .setTaskType("foo")
+ .setStatus(CeQueueDto.Status.PENDING)
+ .setExecutionCount(0));
+ dbTester.commit();
+
+ assertThat(underTest.peek(WORKER_UUID_1).get().getUuid()).isEqualTo("uuid");
+ assertThat(dbTester.getDbClient().ceQueueDao().selectByUuid(session, "uuid").get().getExecutionCount()).isEqualTo(1);
+ }
+
+ @Test
+ public void peek_peeks_pending_tasks_with_executionCount_equal_to_1_and_increases_it() {
+ dbTester.getDbClient().ceQueueDao().insert(session, new CeQueueDto()
+ .setUuid("uuid")
+ .setTaskType("foo")
+ .setStatus(CeQueueDto.Status.PENDING)
+ .setExecutionCount(1));
+ dbTester.commit();
+
+ assertThat(underTest.peek(WORKER_UUID_1).get().getUuid()).isEqualTo("uuid");
+ assertThat(dbTester.getDbClient().ceQueueDao().selectByUuid(session, "uuid").get().getExecutionCount()).isEqualTo(2);
+ }
+
+ @Test
+ public void peek_ignores_pending_tasks_with_executionCount_equal_to_2() {
+ dbTester.getDbClient().ceQueueDao().insert(session, new CeQueueDto()
+ .setUuid("uuid")
+ .setTaskType("foo")
+ .setStatus(CeQueueDto.Status.PENDING)
+ .setExecutionCount(2));
+ dbTester.commit();
+
+ assertThat(underTest.peek(WORKER_UUID_1).isPresent()).isFalse();
+ }
+
+ @Test
+ public void peek_ignores_pending_tasks_with_executionCount_greater_than_2() {
+ dbTester.getDbClient().ceQueueDao().insert(session, new CeQueueDto()
+ .setUuid("uuid")
+ .setTaskType("foo")
+ .setStatus(CeQueueDto.Status.PENDING)
+ .setExecutionCount(2 + Math.abs(new Random().nextInt(100))));
+ dbTester.commit();
+
+ assertThat(underTest.peek(WORKER_UUID_1).isPresent()).isFalse();
+ }
+
@Test
public void cancel_pending() throws Exception {
CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1");