Browse Source

Drop usage of db column ce_queue.execution_count

tags/7.5
Simon Brandhof 6 years ago
parent
commit
b24f836dbe
18 changed files with 123 additions and 468 deletions
  1. 0
    11
      server/sonar-ce/src/main/java/org/sonar/ce/cleaning/CeCleaningSchedulerImpl.java
  2. 0
    2
      server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueue.java
  3. 2
    17
      server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java
  4. 0
    21
      server/sonar-ce/src/test/java/org/sonar/ce/cleaning/CeCleaningSchedulerImplTest.java
  5. 45
    148
      server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java
  6. 0
    12
      server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeActivityDto.java
  7. 10
    10
      server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueDao.java
  8. 0
    15
      server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueDto.java
  9. 3
    3
      server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueMapper.java
  10. 0
    51
      server/sonar-db-dao/src/main/java/org/sonar/db/ce/EligibleTaskDto.java
  11. 2
    14
      server/sonar-db-dao/src/main/java/org/sonar/db/ce/UpdateIf.java
  12. 1
    2
      server/sonar-db-dao/src/main/resources/org/sonar/db/ce/CeActivityMapper.xml
  13. 12
    19
      server/sonar-db-dao/src/main/resources/org/sonar/db/ce/CeQueueMapper.xml
  14. 0
    2
      server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeActivityDaoTest.java
  15. 44
    124
      server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeQueueDaoTest.java
  16. 0
    11
      server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeQueueDtoTest.java
  17. 0
    1
      server/sonar-db-dao/src/test/java/org/sonar/db/purge/PurgeDaoTest.java
  18. 4
    5
      server/sonar-server/src/test/java/org/sonar/ce/queue/CeQueueImplTest.java

+ 0
- 11
server/sonar-ce/src/main/java/org/sonar/ce/cleaning/CeCleaningSchedulerImpl.java View File

@@ -56,10 +56,8 @@ public class CeCleaningSchedulerImpl implements CeCleaningScheduler {
Lock ceCleaningJobLock = ceDistributedInformation.acquireCleanJobLock();

// If we cannot lock that means that another job is running
// So we skip the cancelWornOuts() method
if (ceCleaningJobLock.tryLock()) {
try {
cancelWornOuts();
resetTasksWithUnknownWorkerUUIDs();
} finally {
ceCleaningJobLock.unlock();
@@ -67,15 +65,6 @@ public class CeCleaningSchedulerImpl implements CeCleaningScheduler {
}
}

private void cancelWornOuts() {
try {
LOG.debug("Deleting any worn out task");
internalCeQueue.cancelWornOuts();
} catch (Exception e) {
LOG.warn("Failed to cancel worn out tasks", e);
}
}

private void resetTasksWithUnknownWorkerUUIDs() {
try {
LOG.debug("Resetting state of tasks with unknown worker UUIDs");

+ 0
- 2
server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueue.java View File

@@ -66,7 +66,5 @@ public interface InternalCeQueue extends CeQueue {
*/
void remove(CeTask task, Status status, @Nullable CeTaskResult taskResult, @Nullable Throwable error);

void cancelWornOuts();

void resetTasksWithUnknownWorkerUUIDs(Set<String> knownWorkerUUIDs);
}

+ 2
- 17
server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java View File

@@ -52,8 +52,6 @@ import static java.util.Objects.requireNonNull;
public class InternalCeQueueImpl extends CeQueueImpl implements InternalCeQueue {
private static final Logger LOG = Loggers.get(InternalCeQueueImpl.class);

private static final int MAX_EXECUTION_COUNT = 1;

private final System2 system2;
private final DbClient dbClient;
private final CEQueueStatus queueStatus;
@@ -81,7 +79,7 @@ public class InternalCeQueueImpl extends CeQueueImpl implements InternalCeQueue
if (i > 0) {
LOG.debug("{} in progress tasks reset for worker uuid {}", i, workerUuid);
}
Optional<CeQueueDto> dto = ceQueueDao.peek(dbSession, workerUuid, MAX_EXECUTION_COUNT);
Optional<CeQueueDto> dto = ceQueueDao.peek(dbSession, workerUuid);
CeTask task = null;
if (dto.isPresent()) {
task = loadTask(dbSession, dto.get());
@@ -101,7 +99,7 @@ public class InternalCeQueueImpl extends CeQueueImpl implements InternalCeQueue
checkArgument(error == null || status == CeActivityDto.Status.FAILED, "Error can be provided only when status is FAILED");
try (DbSession dbSession = dbClient.openSession(false)) {
CeQueueDto queueDto = dbClient.ceQueueDao().selectByUuid(dbSession, task.getUuid())
.orElseThrow(() -> new IllegalStateException("Task does not exist anymore: " + task));
.orElseThrow(() -> new IllegalStateException("Task does not exist anymore: " + task));
CeActivityDto activityDto = new CeActivityDto(queueDto);
activityDto.setStatus(status);
updateQueueStatus(status, activityDto);
@@ -165,19 +163,6 @@ public class InternalCeQueueImpl extends CeQueueImpl implements InternalCeQueue
}
}

@Override
public void cancelWornOuts() {
try (DbSession dbSession = dbClient.openSession(false)) {
List<CeQueueDto> wornOutTasks = dbClient.ceQueueDao().selectPendingByMinimumExecutionCount(dbSession, MAX_EXECUTION_COUNT);
wornOutTasks.forEach(queueDto -> {
CeActivityDto activityDto = new CeActivityDto(queueDto);
activityDto.setStatus(CeActivityDto.Status.CANCELED);
updateQueueStatus(CeActivityDto.Status.CANCELED, activityDto);
remove(dbSession, queueDto, activityDto);
});
}
}

@Override
public void resetTasksWithUnknownWorkerUUIDs(Set<String> knownWorkerUUIDs) {
try (DbSession dbSession = dbClient.openSession(false)) {

+ 0
- 21
server/sonar-ce/src/test/java/org/sonar/ce/cleaning/CeCleaningSchedulerImplTest.java View File

@@ -50,32 +50,13 @@ public class CeCleaningSchedulerImplTest {
CeDistributedInformation mockedCeDistributedInformation = mockCeDistributedInformation(jobLock);
CeCleaningSchedulerImpl underTest = mockCeCleaningSchedulerImpl(mockedInternalCeQueue, mockedCeDistributedInformation);
Exception exception = new IllegalArgumentException("faking unchecked exception thrown by cancelWornOuts");
doThrow(exception).when(mockedInternalCeQueue).cancelWornOuts();
doThrow(exception).when(mockedInternalCeQueue).resetTasksWithUnknownWorkerUUIDs(any());

underTest.startScheduling();

verify(mockedInternalCeQueue).cancelWornOuts();
verify(mockedInternalCeQueue).resetTasksWithUnknownWorkerUUIDs(any());
}

@Test
public void startScheduling_fails_if_cancelWornOuts_send_an_Error() {
InternalCeQueue mockedInternalCeQueue = mock(InternalCeQueue.class);
CeDistributedInformation mockedCeDistributedInformation = mockCeDistributedInformation(jobLock);
CeCleaningSchedulerImpl underTest = mockCeCleaningSchedulerImpl(mockedInternalCeQueue, mockedCeDistributedInformation);
Error expected = new Error("faking Error thrown by cancelWornOuts");
doThrow(expected).when(mockedInternalCeQueue).cancelWornOuts();

try {
underTest.startScheduling();
fail("the error should have been thrown");
} catch (Error e) {
assertThat(e).isSameAs(expected);
}
verify(mockedInternalCeQueue).cancelWornOuts();
}

@Test
public void startScheduling_fails_if_resetTasksWithUnknownWorkerUUIDs_send_an_Error() {
InternalCeQueue mockedInternalCeQueue = mock(InternalCeQueue.class);
@@ -120,7 +101,6 @@ public class CeCleaningSchedulerImplTest {
verify(jobLock, times(0)).unlock();
// since lock cannot be locked, cleaning job methods must not be called
verify(mockedInternalCeQueue, times(0)).resetTasksWithUnknownWorkerUUIDs(any());
verify(mockedInternalCeQueue, times(0)).cancelWornOuts();
}

@Test
@@ -159,7 +139,6 @@ public class CeCleaningSchedulerImplTest {

underTest.startScheduling();
assertThat(executorService.schedulerCounter).isEqualTo(1);
verify(mockedInternalCeQueue).cancelWornOuts();
}

private CeConfiguration mockCeConfiguration(long cleanCeTasksInitialDelay, long cleanCeTasksDelay) {

+ 45
- 148
server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java View File

@@ -24,7 +24,6 @@ import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import javax.annotation.Nullable;
import org.junit.Before;
import org.junit.Rule;
@@ -49,7 +48,6 @@ import org.sonar.server.computation.task.step.TypedException;
import org.sonar.server.organization.DefaultOrganization;
import org.sonar.server.organization.DefaultOrganizationProvider;

import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Arrays.asList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
@@ -258,13 +256,12 @@ public class InternalCeQueueImplTest {
}

@Test
public void remove_copies_executionCount_and_workerUuid() {
public void remove_copies_workerUuid() {
db.getDbClient().ceQueueDao().insert(session, new CeQueueDto()
.setUuid("uuid")
.setTaskType("foo")
.setStatus(CeQueueDto.Status.PENDING)
.setWorkerUuid("Dustin")
.setExecutionCount(2));
.setWorkerUuid("Dustin"));
db.commit();

underTest.remove(new CeTask.Builder()
@@ -274,7 +271,6 @@ public class InternalCeQueueImplTest {
.build(), CeActivityDto.Status.SUCCESS, null, null);

CeActivityDto dto = db.getDbClient().ceActivityDao().selectByUuid(db.getSession(), "uuid").get();
assertThat(dto.getExecutionCount()).isEqualTo(2);
assertThat(dto.getWorkerUuid()).isEqualTo("Dustin");
}

@@ -342,77 +338,35 @@ public class InternalCeQueueImplTest {
}

@Test
public void peek_peeks_pending_tasks_with_executionCount_equal_to_0_and_increases_it() {
public void peek_peeks_pending_task() {
db.getDbClient().ceQueueDao().insert(session, new CeQueueDto()
.setUuid("uuid")
.setTaskType("foo")
.setStatus(CeQueueDto.Status.PENDING)
.setExecutionCount(0));
.setStatus(CeQueueDto.Status.PENDING));
db.commit();

assertThat(underTest.peek(WORKER_UUID_1).get().getUuid()).isEqualTo("uuid");
assertThat(db.getDbClient().ceQueueDao().selectByUuid(session, "uuid").get().getExecutionCount()).isEqualTo(1);
}

@Test
public void peek_ignores_pending_tasks_with_executionCount_equal_to_1() {
db.getDbClient().ceQueueDao().insert(session, new CeQueueDto()
.setUuid("uuid")
.setTaskType("foo")
.setStatus(CeQueueDto.Status.PENDING)
.setExecutionCount(1));
db.commit();

assertThat(underTest.peek(WORKER_UUID_1).isPresent()).isFalse();
}

@Test
public void peek_ignores_pending_tasks_with_executionCount_equal_to_2() {
db.getDbClient().ceQueueDao().insert(session, new CeQueueDto()
.setUuid("uuid")
.setTaskType("foo")
.setStatus(CeQueueDto.Status.PENDING)
.setExecutionCount(2));
db.commit();

assertThat(underTest.peek(WORKER_UUID_1).isPresent()).isFalse();
}

@Test
public void peek_ignores_pending_tasks_with_executionCount_greater_than_2() {
db.getDbClient().ceQueueDao().insert(session, new CeQueueDto()
.setUuid("uuid")
.setTaskType("foo")
.setStatus(CeQueueDto.Status.PENDING)
.setExecutionCount(2 + Math.abs(new Random().nextInt(100))));
db.commit();

assertThat(underTest.peek(WORKER_UUID_1).isPresent()).isFalse();
}

@Test
public void peek_resets_to_pending_any_task_in_progress_for_specified_worker_uuid_and_updates_updatedAt_no_matter_execution_count() {
insertPending("u0", "doesn't matter", 0); // add a pending one that will be picked so that u1 isn't peek and status reset is visible in DB
CeQueueDto u1 = insertPending("u1", WORKER_UUID_1, 2);// won't be peeked because it's worn-out
CeQueueDto u2 = insertInProgress("u2", WORKER_UUID_1, 3);// will be reset but won't be picked because it's worn-out
CeQueueDto u3 = insertPending("u3", WORKER_UUID_1, 1);// will be picked-because older than any of the reset ones
CeQueueDto u4 = insertInProgress("u4", WORKER_UUID_1, 1);// will be reset
public void peek_resets_to_pending_any_task_in_progress_for_specified_worker_uuid_and_updates_updatedAt() {
insertPending("u0", "doesn't matter"); // add a pending one that will be picked so that u1 isn't peek and status reset is visible in DB
CeQueueDto u1 = insertPending("u1", WORKER_UUID_1);// will be picked-because older than any of the reset ones
CeQueueDto u2 = insertInProgress("u2", WORKER_UUID_1);// will be reset

assertThat(underTest.peek(WORKER_UUID_1).get().getUuid()).isEqualTo("u0");

verifyUnmodifiedTask(u1);
verifyResetTask(u2);
verifyUnmodifiedTask(u3);
verifyResetTask(u4);
}

@Test
public void peek_resets_to_pending_any_task_in_progress_for_specified_worker_uuid_and_only_this_uuid() {
insertPending("u0", "doesn't matter", 0); // add a pending one that will be picked so that u1 isn't peek and status reset is visible in DB
CeQueueDto u1 = insertInProgress("u1", WORKER_UUID_1, 3);
CeQueueDto u2 = insertInProgress("u2", WORKER_UUID_2, 3);
CeQueueDto u3 = insertInProgress("u3", WORKER_UUID_1, 3);
CeQueueDto u4 = insertInProgress("u4", WORKER_UUID_2, 1);
insertPending("u0", "doesn't matter"); // add a pending one that will be picked so that u1 isn't peek and status reset is visible in DB
CeQueueDto u1 = insertInProgress("u1", WORKER_UUID_1);
CeQueueDto u2 = insertInProgress("u2", WORKER_UUID_2);
CeQueueDto u3 = insertInProgress("u3", WORKER_UUID_1);
CeQueueDto u4 = insertInProgress("u4", WORKER_UUID_2);

assertThat(underTest.peek(WORKER_UUID_1).get().getUuid()).isEqualTo("u0");

@@ -422,32 +376,9 @@ public class InternalCeQueueImplTest {
verifyUnmodifiedTask(u4);
}

@Test
public void peek_resets_to_pending_any_task_in_progress_for_specified_worker_uuid_and_peeks_the_oldest_non_worn_out_no_matter_if_it_has_been_reset_or_not() {
insertPending("u1", WORKER_UUID_1, 3); // won't be picked because worn out
insertInProgress("u2", WORKER_UUID_1, 3); // will be reset but won't be picked because worn out
insertInProgress("u3", WORKER_UUID_1, 1); // will be reset but won't be picked because worn out
insertPending("u4", WORKER_UUID_1, 0); // will be picked

Optional<CeTask> ceTask = underTest.peek(WORKER_UUID_1);
assertThat(ceTask.get().getUuid()).isEqualTo("u4");
}

@Test
public void peek_resets_to_pending_any_task_in_progress_for_specified_worker_uuid_and_peeks_reset_tasks_if_is_the_oldest_non_worn_out() {
insertPending("u1", WORKER_UUID_1, 3); // won't be picked because worn out
insertInProgress("u2", WORKER_UUID_1, 3); // will be reset but won't be picked because worn out
insertInProgress("u3", WORKER_UUID_1, 1); // won't be picked because worn out
insertPending("u4", WORKER_UUID_1, 0); // will be picked second

Optional<CeTask> ceTask = underTest.peek(WORKER_UUID_1);
assertThat(ceTask.get().getUuid()).isEqualTo("u4");
}

private void verifyResetTask(CeQueueDto originalDto) {
CeQueueDto dto = db.getDbClient().ceQueueDao().selectByUuid(session, originalDto.getUuid()).get();
assertThat(dto.getStatus()).isEqualTo(CeQueueDto.Status.PENDING);
assertThat(dto.getExecutionCount()).isEqualTo(originalDto.getExecutionCount());
assertThat(dto.getCreatedAt()).isEqualTo(originalDto.getCreatedAt());
assertThat(dto.getUpdatedAt()).isGreaterThan(originalDto.getUpdatedAt());
}
@@ -455,32 +386,27 @@ public class InternalCeQueueImplTest {
private void verifyUnmodifiedTask(CeQueueDto originalDto) {
CeQueueDto dto = db.getDbClient().ceQueueDao().selectByUuid(session, originalDto.getUuid()).get();
assertThat(dto.getStatus()).isEqualTo(originalDto.getStatus());
assertThat(dto.getExecutionCount()).isEqualTo(originalDto.getExecutionCount());
assertThat(dto.getCreatedAt()).isEqualTo(originalDto.getCreatedAt());
assertThat(dto.getUpdatedAt()).isEqualTo(originalDto.getUpdatedAt());
}

private CeQueueDto insertInProgress(String uuid, String workerUuid, int executionCount) {
checkArgument(executionCount > 0, "execution count less than 1 does not make sense for an in progress task");
private CeQueueDto insertInProgress(String uuid, String workerUuid) {
CeQueueDto dto = new CeQueueDto()
.setUuid(uuid)
.setTaskType("foo")
.setStatus(CeQueueDto.Status.IN_PROGRESS)
.setWorkerUuid(workerUuid)
.setExecutionCount(executionCount);
.setWorkerUuid(workerUuid);
db.getDbClient().ceQueueDao().insert(session, dto);
db.commit();
return dto;
}

private CeQueueDto insertPending(String uuid, String workerUuid, int executionCount) {
checkArgument(executionCount > -1, "execution count less than 0 does not make sense for a pending task");
private CeQueueDto insertPending(String uuid, String workerUuid) {
CeQueueDto dto = new CeQueueDto()
.setUuid(uuid)
.setTaskType("foo")
.setStatus(CeQueueDto.Status.PENDING)
.setWorkerUuid(workerUuid)
.setExecutionCount(executionCount);
.setWorkerUuid(workerUuid);
db.getDbClient().ceQueueDao().insert(session, dto);
db.commit();
return dto;
@@ -499,19 +425,17 @@ public class InternalCeQueueImplTest {
}

@Test
public void cancel_copies_executionCount_and_workerUuid() {
public void cancel_copies_workerUuid() {
CeQueueDto ceQueueDto = db.getDbClient().ceQueueDao().insert(session, new CeQueueDto()
.setUuid("uuid")
.setTaskType("foo")
.setStatus(CeQueueDto.Status.PENDING)
.setWorkerUuid("Dustin")
.setExecutionCount(2));
.setWorkerUuid("Dustin"));
db.commit();

underTest.cancel(db.getSession(), ceQueueDto);

CeActivityDto dto = db.getDbClient().ceActivityDao().selectByUuid(db.getSession(), "uuid").get();
assertThat(dto.getExecutionCount()).isEqualTo(2);
assertThat(dto.getWorkerUuid()).isEqualTo("Dustin");
}

@@ -545,39 +469,16 @@ public class InternalCeQueueImplTest {
assertThat(history.isPresent()).isFalse();
}

@Test
public void cancelWornOuts_cancels_pending_tasks_with_executionCount_greater_or_equal_to_1() {
CeQueueDto u1 = insertCeQueueDto("u1", CeQueueDto.Status.PENDING, 0, "worker1");
CeQueueDto u2 = insertCeQueueDto("u2", CeQueueDto.Status.PENDING, 1, "worker1");
CeQueueDto u3 = insertCeQueueDto("u3", CeQueueDto.Status.PENDING, 2, "worker1");
CeQueueDto u4 = insertCeQueueDto("u4", CeQueueDto.Status.PENDING, 3, "worker1");
CeQueueDto u5 = insertCeQueueDto("u5", CeQueueDto.Status.IN_PROGRESS, 0, "worker1");
CeQueueDto u6 = insertCeQueueDto("u6", CeQueueDto.Status.IN_PROGRESS, 1, "worker1");
CeQueueDto u7 = insertCeQueueDto("u7", CeQueueDto.Status.IN_PROGRESS, 2, "worker1");
CeQueueDto u8 = insertCeQueueDto("u8", CeQueueDto.Status.IN_PROGRESS, 3, "worker1");

underTest.cancelWornOuts();

verifyUnmodified(u1);
verifyCanceled(u2);
verifyCanceled(u3);
verifyCanceled(u4);
verifyUnmodified(u5);
verifyUnmodified(u6);
verifyUnmodified(u7);
verifyUnmodified(u8);
}

@Test
public void resetTasksWithUnknownWorkerUUIDs_reset_only_in_progress_tasks() {
CeQueueDto u1 = insertCeQueueDto("u1", CeQueueDto.Status.PENDING, 0, null);
CeQueueDto u2 = insertCeQueueDto("u2", CeQueueDto.Status.PENDING, 1, "worker1");
CeQueueDto u3 = insertCeQueueDto("u3", CeQueueDto.Status.PENDING, 2, null);
CeQueueDto u4 = insertCeQueueDto("u4", CeQueueDto.Status.PENDING, 3, "worker2");
CeQueueDto u5 = insertCeQueueDto("u5", CeQueueDto.Status.IN_PROGRESS, 0, null);
CeQueueDto u6 = insertCeQueueDto("u6", CeQueueDto.Status.IN_PROGRESS, 1, "worker1");
CeQueueDto u7 = insertCeQueueDto("u7", CeQueueDto.Status.IN_PROGRESS, 2, "worker2");
CeQueueDto u8 = insertCeQueueDto("u8", CeQueueDto.Status.IN_PROGRESS, 3, "worker3");
CeQueueDto u1 = insertCeQueueDto("u1", CeQueueDto.Status.PENDING, null);
CeQueueDto u2 = insertCeQueueDto("u2", CeQueueDto.Status.PENDING, "worker1");
CeQueueDto u3 = insertCeQueueDto("u3", CeQueueDto.Status.PENDING, null);
CeQueueDto u4 = insertCeQueueDto("u4", CeQueueDto.Status.PENDING, "worker2");
CeQueueDto u5 = insertCeQueueDto("u5", CeQueueDto.Status.IN_PROGRESS, null);
CeQueueDto u6 = insertCeQueueDto("u6", CeQueueDto.Status.IN_PROGRESS, "worker1");
CeQueueDto u7 = insertCeQueueDto("u7", CeQueueDto.Status.IN_PROGRESS, "worker2");
CeQueueDto u8 = insertCeQueueDto("u8", CeQueueDto.Status.IN_PROGRESS, "worker3");

underTest.resetTasksWithUnknownWorkerUUIDs(ImmutableSet.of("worker2", "worker3"));

@@ -598,14 +499,14 @@ public class InternalCeQueueImplTest {

@Test
public void resetTasksWithUnknownWorkerUUIDs_with_empty_set_will_reset_all_in_progress_tasks() {
CeQueueDto u1 = insertCeQueueDto("u1", CeQueueDto.Status.PENDING, 0, null);
CeQueueDto u2 = insertCeQueueDto("u2", CeQueueDto.Status.PENDING, 1, "worker1");
CeQueueDto u3 = insertCeQueueDto("u3", CeQueueDto.Status.PENDING, 2, null);
CeQueueDto u4 = insertCeQueueDto("u4", CeQueueDto.Status.PENDING, 3, "worker2");
CeQueueDto u5 = insertCeQueueDto("u5", CeQueueDto.Status.IN_PROGRESS, 0, null);
CeQueueDto u6 = insertCeQueueDto("u6", CeQueueDto.Status.IN_PROGRESS, 1, "worker1");
CeQueueDto u7 = insertCeQueueDto("u7", CeQueueDto.Status.IN_PROGRESS, 2, "worker2");
CeQueueDto u8 = insertCeQueueDto("u8", CeQueueDto.Status.IN_PROGRESS, 3, "worker3");
CeQueueDto u1 = insertCeQueueDto("u1", CeQueueDto.Status.PENDING, null);
CeQueueDto u2 = insertCeQueueDto("u2", CeQueueDto.Status.PENDING, "worker1");
CeQueueDto u3 = insertCeQueueDto("u3", CeQueueDto.Status.PENDING, null);
CeQueueDto u4 = insertCeQueueDto("u4", CeQueueDto.Status.PENDING, "worker2");
CeQueueDto u5 = insertCeQueueDto("u5", CeQueueDto.Status.IN_PROGRESS, null);
CeQueueDto u6 = insertCeQueueDto("u6", CeQueueDto.Status.IN_PROGRESS, "worker1");
CeQueueDto u7 = insertCeQueueDto("u7", CeQueueDto.Status.IN_PROGRESS, "worker2");
CeQueueDto u8 = insertCeQueueDto("u8", CeQueueDto.Status.IN_PROGRESS, "worker3");

underTest.resetTasksWithUnknownWorkerUUIDs(ImmutableSet.of());

@@ -624,14 +525,14 @@ public class InternalCeQueueImplTest {

@Test
public void resetTasksWithUnknownWorkerUUIDs_with_worker_without_tasks_will_reset_all_in_progress_tasks() {
CeQueueDto u1 = insertCeQueueDto("u1", CeQueueDto.Status.PENDING, 0, null);
CeQueueDto u2 = insertCeQueueDto("u2", CeQueueDto.Status.PENDING, 1, "worker1");
CeQueueDto u3 = insertCeQueueDto("u3", CeQueueDto.Status.PENDING, 2, null);
CeQueueDto u4 = insertCeQueueDto("u4", CeQueueDto.Status.PENDING, 3, "worker2");
CeQueueDto u5 = insertCeQueueDto("u5", CeQueueDto.Status.IN_PROGRESS, 0, null);
CeQueueDto u6 = insertCeQueueDto("u6", CeQueueDto.Status.IN_PROGRESS, 1, "worker1");
CeQueueDto u7 = insertCeQueueDto("u7", CeQueueDto.Status.IN_PROGRESS, 2, "worker2");
CeQueueDto u8 = insertCeQueueDto("u8", CeQueueDto.Status.IN_PROGRESS, 3, "worker3");
CeQueueDto u1 = insertCeQueueDto("u1", CeQueueDto.Status.PENDING, null);
CeQueueDto u2 = insertCeQueueDto("u2", CeQueueDto.Status.PENDING, "worker1");
CeQueueDto u3 = insertCeQueueDto("u3", CeQueueDto.Status.PENDING, null);
CeQueueDto u4 = insertCeQueueDto("u4", CeQueueDto.Status.PENDING, "worker2");
CeQueueDto u5 = insertCeQueueDto("u5", CeQueueDto.Status.IN_PROGRESS, null);
CeQueueDto u6 = insertCeQueueDto("u6", CeQueueDto.Status.IN_PROGRESS, "worker1");
CeQueueDto u7 = insertCeQueueDto("u7", CeQueueDto.Status.IN_PROGRESS, "worker2");
CeQueueDto u8 = insertCeQueueDto("u8", CeQueueDto.Status.IN_PROGRESS, "worker3");

underTest.resetTasksWithUnknownWorkerUUIDs(ImmutableSet.of("worker1000", "worker1001"));

@@ -650,8 +551,7 @@ public class InternalCeQueueImplTest {

private void verifyReset(CeQueueDto original) {
CeQueueDto dto = db.getDbClient().ceQueueDao().selectByUuid(db.getSession(), original.getUuid()).get();
// We do not touch ExecutionCount nor CreatedAt
assertThat(dto.getExecutionCount()).isEqualTo(original.getExecutionCount());
// We do not touch CreatedAt
assertThat(dto.getCreatedAt()).isEqualTo(original.getCreatedAt());

// Status must have changed to PENDING and must not be equal to previous status
@@ -667,7 +567,6 @@ public class InternalCeQueueImplTest {
private void verifyUnmodified(CeQueueDto original) {
CeQueueDto dto = db.getDbClient().ceQueueDao().selectByUuid(db.getSession(), original.getUuid()).get();
assertThat(dto.getStatus()).isEqualTo(original.getStatus());
assertThat(dto.getExecutionCount()).isEqualTo(original.getExecutionCount());
assertThat(dto.getCreatedAt()).isEqualTo(original.getCreatedAt());
assertThat(dto.getUpdatedAt()).isEqualTo(original.getUpdatedAt());
}
@@ -676,16 +575,14 @@ public class InternalCeQueueImplTest {
assertThat(db.getDbClient().ceQueueDao().selectByUuid(db.getSession(), original.getUuid())).isEmpty();
CeActivityDto dto = db.getDbClient().ceActivityDao().selectByUuid(db.getSession(), original.getUuid()).get();
assertThat(dto.getStatus()).isEqualTo(CeActivityDto.Status.CANCELED);
assertThat(dto.getExecutionCount()).isEqualTo(original.getExecutionCount());
assertThat(dto.getWorkerUuid()).isEqualTo(original.getWorkerUuid());
}

private CeQueueDto insertCeQueueDto(String uuid, CeQueueDto.Status status, int executionCount, String workerUuid) {
private CeQueueDto insertCeQueueDto(String uuid, CeQueueDto.Status status, String workerUuid) {
CeQueueDto dto = new CeQueueDto()
.setUuid(uuid)
.setTaskType("foo")
.setStatus(status)
.setExecutionCount(executionCount)
.setWorkerUuid(workerUuid);
db.getDbClient().ceQueueDao().insert(db.getSession(), dto);
db.commit();

+ 0
- 12
server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeActivityDto.java View File

@@ -44,7 +44,6 @@ public class CeActivityDto {
private String isLastKey;
private String submitterLogin;
private String workerUuid;
private int executionCount;
private long submittedAt;
private Long startedAt;
private Long executedAt;
@@ -95,7 +94,6 @@ public class CeActivityDto {
this.isLastKey = format("%s%s", taskType, Strings.nullToEmpty(componentUuid));
this.submitterLogin = queueDto.getSubmitterLogin();
this.workerUuid = queueDto.getWorkerUuid();
this.executionCount = queueDto.getExecutionCount();
this.submittedAt = queueDto.getCreatedAt();
this.startedAt = queueDto.getStartedAt();
}
@@ -235,15 +233,6 @@ public class CeActivityDto {
return this;
}

public int getExecutionCount() {
return executionCount;
}

public CeActivityDto setExecutionCount(int executionCount) {
this.executionCount = executionCount;
return this;
}

@CheckForNull
public String getErrorMessage() {
return errorMessage;
@@ -306,7 +295,6 @@ public class CeActivityDto {
", isLastKey='" + isLastKey + '\'' +
", submitterLogin='" + submitterLogin + '\'' +
", workerUuid='" + workerUuid + '\'' +
", executionCount=" + executionCount +
", submittedAt=" + submittedAt +
", startedAt=" + startedAt +
", executedAt=" + executedAt +

+ 10
- 10
server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueDao.java View File

@@ -85,8 +85,8 @@ public class CeQueueDao implements Dao {
return Optional.ofNullable(mapper(session).selectByUuid(uuid));
}

public List<CeQueueDto> selectPendingByMinimumExecutionCount(DbSession dbSession, int minExecutionCount) {
return mapper(dbSession).selectPendingByMinimumExecutionCount(minExecutionCount);
public List<CeQueueDto> selectPending(DbSession dbSession) {
return mapper(dbSession).selectPending();
}

public void resetTasksWithUnknownWorkerUUIDs(DbSession dbSession, Set<String> knownWorkerUUIDs) {
@@ -151,26 +151,26 @@ public class CeQueueDao implements Dao {
return builder.build();
}

public Optional<CeQueueDto> peek(DbSession session, String workerUuid, int maxExecutionCount) {
List<EligibleTaskDto> eligibles = mapper(session).selectEligibleForPeek(maxExecutionCount, ONE_RESULT_PAGINATION);
public Optional<CeQueueDto> peek(DbSession session, String workerUuid) {
List<String> eligibles = mapper(session).selectEligibleForPeek(ONE_RESULT_PAGINATION);
if (eligibles.isEmpty()) {
return Optional.empty();
}

EligibleTaskDto eligible = eligibles.get(0);
String eligible = eligibles.get(0);
return tryToPeek(session, eligible, workerUuid);
}

private Optional<CeQueueDto> tryToPeek(DbSession session, EligibleTaskDto eligible, String workerUuid) {
private Optional<CeQueueDto> tryToPeek(DbSession session, String eligibleTaskUuid, String workerUuid) {
long now = system2.now();
int touchedRows = mapper(session).updateIf(eligible.getUuid(),
new UpdateIf.NewProperties(IN_PROGRESS, workerUuid, eligible.getExecutionCount() + 1, now, now),
new UpdateIf.OldProperties(PENDING, eligible.getExecutionCount()));
int touchedRows = mapper(session).updateIf(eligibleTaskUuid,
new UpdateIf.NewProperties(IN_PROGRESS, workerUuid, now, now),
new UpdateIf.OldProperties(PENDING));
if (touchedRows != 1) {
return Optional.empty();
}

CeQueueDto result = mapper(session).selectByUuid(eligible.getUuid());
CeQueueDto result = mapper(session).selectByUuid(eligibleTaskUuid);
session.commit();
return Optional.ofNullable(result);
}

+ 0
- 15
server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueDto.java View File

@@ -39,10 +39,6 @@ public class CeQueueDto {
* UUID of the worker that is executing, or of the last worker that executed, the current task.
*/
private String workerUuid;
/**
* This counter is incremented by 1 each time the tasks switches to status {@link Status#IN_PROGRESS IN_PROGRESS}.
*/
private int executionCount = 0;
private Long startedAt;
private long createdAt;
private long updatedAt;
@@ -108,16 +104,6 @@ public class CeQueueDto {
return this;
}

public int getExecutionCount() {
return executionCount;
}

public CeQueueDto setExecutionCount(int executionCount) {
checkArgument(executionCount >= 0, "execution count can't be < 0");
this.executionCount = executionCount;
return this;
}

@CheckForNull
public Long getStartedAt() {
return startedAt;
@@ -155,7 +141,6 @@ public class CeQueueDto {
", status=" + status +
", submitterLogin='" + submitterLogin + '\'' +
", workerUuid='" + workerUuid + '\'' +
", executionCount=" + executionCount +
", startedAt=" + startedAt +
", createdAt=" + createdAt +
", updatedAt=" + updatedAt +

+ 3
- 3
server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueMapper.java View File

@@ -37,15 +37,15 @@ public interface CeQueueMapper {

int countByQuery(@Param("query") CeTaskQuery query);

List<EligibleTaskDto> selectEligibleForPeek(@Param("maxExecutionCount") int maxExecutionCount, @Param("pagination") Pagination pagination);
List<String> selectEligibleForPeek(@Param("pagination") Pagination pagination);

@CheckForNull
CeQueueDto selectByUuid(@Param("uuid") String uuid);

/**
* Select all pending tasks which execution count is greater than or equal to the specified {@code minExecutionCount}.
* Select all pending tasks
*/
List<CeQueueDto> selectPendingByMinimumExecutionCount(@Param("minExecutionCount") int minExecutionCount);
List<CeQueueDto> selectPending();

/**
* Select all tasks whose worker UUID is not present in {@code knownWorkerUUIDs}

+ 0
- 51
server/sonar-db-dao/src/main/java/org/sonar/db/ce/EligibleTaskDto.java View File

@@ -1,51 +0,0 @@
/*
* SonarQube
* Copyright (C) 2009-2018 SonarSource SA
* mailto:info AT sonarsource DOT com
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 3 of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program; if not, write to the Free Software Foundation,
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package org.sonar.db.ce;

public class EligibleTaskDto {
private String uuid;
private int executionCount;

public String getUuid() {
return uuid;
}

public EligibleTaskDto setUuid(String uuid) {
this.uuid = uuid;
return this;
}

public int getExecutionCount() {
return executionCount;
}

public EligibleTaskDto setExecutionCount(int executionCount) {
this.executionCount = executionCount;
return this;
}

@Override
public String toString() {
return "EligibleTaskDto{" +
"uuid='" + uuid + '\'' +
", executionCount=" + executionCount +
'}';
}
}

+ 2
- 14
server/sonar-db-dao/src/main/java/org/sonar/db/ce/UpdateIf.java View File

@@ -34,15 +34,13 @@ final class UpdateIf {
public static class NewProperties {
private final CeQueueDto.Status status;
private final String workerUuid;
private final int executionCount;
private final Long startedAt;
private final long updatedAt;

NewProperties(CeQueueDto.Status status, @Nullable String workerUuid, int executionCount,
NewProperties(CeQueueDto.Status status, @Nullable String workerUuid,
Long startedAt, long updatedAt) {
this.status = requireNonNull(status, "status can't be null");
this.workerUuid = workerUuid;
this.executionCount = executionCount;
this.startedAt = startedAt;
this.updatedAt = updatedAt;
}
@@ -56,10 +54,6 @@ final class UpdateIf {
return workerUuid;
}

public int getExecutionCount() {
return executionCount;
}

@CheckForNull
public Long getStartedAt() {
return startedAt;
@@ -73,20 +67,14 @@ final class UpdateIf {
@Immutable
public static class OldProperties {
private final CeQueueDto.Status status;
private final int executionCount;

OldProperties(CeQueueDto.Status status, int executionCount) {
OldProperties(CeQueueDto.Status status) {
this.status = requireNonNull(status, "status can't be null");
this.executionCount = executionCount;
}

public CeQueueDto.Status getStatus() {
return status;
}

public int getExecutionCount() {
return executionCount;
}
}

}

+ 1
- 2
server/sonar-db-dao/src/main/resources/org/sonar/db/ce/CeActivityMapper.xml View File

@@ -25,7 +25,6 @@
ca.submitter_login as submitterLogin,
ca.submitted_at as submittedAt,
ca.worker_uuid as workerUuid,
ca.execution_count as executionCount,
ca.started_at as startedAt,
ca.executed_at as executedAt,
ca.created_at as createdAt,
@@ -165,7 +164,7 @@
#{submitterLogin,jdbcType=VARCHAR},
#{submittedAt,jdbcType=BIGINT},
#{workerUuid,jdbcType=VARCHAR},
#{executionCount,jdbcType=INTEGER},
0,
#{startedAt,jdbcType=BIGINT},
#{executedAt,jdbcType=BIGINT},
#{createdAt,jdbcType=BIGINT},

+ 12
- 19
server/sonar-db-dao/src/main/resources/org/sonar/db/ce/CeQueueMapper.xml View File

@@ -10,7 +10,6 @@
cq.status as status,
cq.submitter_login as submitterLogin,
cq.worker_uuid as workerUuid,
cq.execution_count as executionCount,
cq.started_at as startedAt,
cq.created_at as createdAt,
cq.updated_at as updatedAt
@@ -127,33 +126,32 @@
</where>
</sql>

<select id="selectEligibleForPeek" resultType="org.sonar.db.ce.EligibleTaskDto">
select
<include refid="columnsSelectEligibleForPeek"/>
<select id="selectEligibleForPeek" resultType="String">
select cq.uuid
<include refid="sqlSelectEligibleForPeek"/>
<include refid="orderBySelectEligibleForPeek"/>
limit #{pagination.pageSize,jdbcType=INTEGER} offset #{pagination.offset,jdbcType=INTEGER}
</select>

<select id="selectEligibleForPeek" parameterType="map" resultType="org.sonar.db.ce.EligibleTaskDto" databaseId="mssql">
select * from (
<select id="selectEligibleForPeek" parameterType="map" resultType="String" databaseId="mssql">
select query.uuid from (
select
row_number() over(<include refid="orderBySelectEligibleForPeek"/>) as number,
<include refid="columnsSelectEligibleForPeek"/>
<include refid="sqlSelectEligibleForPeek"/>
) as query
where
query.number between #{pagination.startRowNumber,jdbcType=INTEGER} and #{pagination.endRowNumber,jdbcType=INTEGER}
query.number between #{pagination.startRowNumber,jdbcType=INTEGER} and #{pagination.endRowNumber,jdbcType=INTEGER}
<include refid="orderBySelectEligibleForPeek"/>
</select>

<select id="selectEligibleForPeek" parameterType="map" resultType="org.sonar.db.ce.EligibleTaskDto" databaseId="oracle">
select * from (
select rownum as rn, t.* from (
<select id="selectEligibleForPeek" parameterType="map" resultType="String" databaseId="oracle">
select taskuuid from (
select rownum as rn, t."uuid" as taskuuid from (
select
<include refid="columnsSelectEligibleForPeek"/>
<include refid="sqlSelectEligibleForPeek" />
<include refid="orderBySelectEligibleForPeek"/>
<include refid="sqlSelectEligibleForPeek" />
<include refid="orderBySelectEligibleForPeek"/>
) t
) t
where
@@ -162,7 +160,6 @@

<sql id="columnsSelectEligibleForPeek">
cq.uuid as "uuid",
cq.execution_count as "executionCount",
cq.created_at as "created_at",
cq.id as "id"
</sql>
@@ -172,7 +169,6 @@
ce_queue cq
where
cq.status='PENDING'
and cq.execution_count &lt; #{maxExecutionCount,jdbcType=INTEGER}
and not exists (
select
1
@@ -190,14 +186,13 @@
id asc
</sql>

<select id="selectPendingByMinimumExecutionCount" resultType="org.sonar.db.ce.CeQueueDto">
<select id="selectPending" resultType="org.sonar.db.ce.CeQueueDto">
select
<include refid="columns"/>
from
ce_queue cq
where
cq.status = 'PENDING'
and cq.execution_count >= #{minExecutionCount,jdbcType=INTEGER}
</select>

<insert id="insert" parameterType="org.sonar.db.ce.CeQueueDto" useGeneratedKeys="false">
@@ -221,7 +216,7 @@
#{status,jdbcType=VARCHAR},
#{submitterLogin,jdbcType=VARCHAR},
#{workerUuid,jdbcType=VARCHAR},
#{executionCount,jdbcType=INTEGER},
0,
#{startedAt,jdbcType=BIGINT},
#{createdAt,jdbcType=BIGINT},
#{updatedAt,jdbcType=BIGINT}
@@ -241,14 +236,12 @@
<update id="updateIf" parameterType="map">
update ce_queue set
status=#{new.status,jdbcType=VARCHAR},
execution_count=#{new.executionCount,jdbcType=INTEGER},
worker_uuid=#{new.workerUuid,jdbcType=VARCHAR},
started_at=#{new.startedAt,jdbcType=BIGINT},
updated_at=#{new.updatedAt,jdbcType=BIGINT}
where
uuid=#{uuid,jdbcType=VARCHAR}
and status=#{old.status,jdbcType=VARCHAR}
and execution_count=#{old.executionCount,jdbcType=INTEGER}
</update>

<delete id="deleteByUuid">

+ 0
- 2
server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeActivityDaoTest.java View File

@@ -66,7 +66,6 @@ public class CeActivityDaoTest {
assertThat(dto.getSubmitterLogin()).isEqualTo("henri");
assertThat(dto.getSubmittedAt()).isEqualTo(1_300_000_000_000L);
assertThat(dto.getWorkerUuid()).isEqualTo("worker uuid");
assertThat(dto.getExecutionCount()).isEqualTo(42);
assertThat(dto.getIsLast()).isTrue();
assertThat(dto.getIsLastKey()).isEqualTo("REPORTPROJECT_1");
assertThat(dto.getCreatedAt()).isEqualTo(1_450_000_000_000L);
@@ -351,7 +350,6 @@ public class CeActivityDaoTest {
queueDto.setComponentUuid(componentUuid);
queueDto.setSubmitterLogin("henri");
queueDto.setWorkerUuid("worker uuid");
queueDto.setExecutionCount(42);
queueDto.setCreatedAt(1_300_000_000_000L);

CeActivityDto dto = new CeActivityDto(queueDto);

+ 44
- 124
server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeQueueDaoTest.java View File

@@ -27,7 +27,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.junit.Rule;
@@ -60,8 +59,6 @@ public class CeQueueDaoTest {
private static final String SUBMITTER_LOGIN = "henri";
private static final String WORKER_UUID_1 = "worker uuid 1";
private static final String WORKER_UUID_2 = "worker uuid 2";
private static final int EXECUTION_COUNT = 42;
private static final int MAX_EXECUTION_COUNT = 2;

private TestSystem2 system2 = new TestSystem2().setNow(INIT_TIME);

@@ -82,8 +79,7 @@ public class CeQueueDaoTest {
.setComponentUuid(COMPONENT_UUID_1)
.setStatus(PENDING)
.setSubmitterLogin(SUBMITTER_LOGIN)
.setWorkerUuid(WORKER_UUID_1)
.setExecutionCount(EXECUTION_COUNT);
.setWorkerUuid(WORKER_UUID_1);

mockSystem2ForSingleCall(now);
underTestWithSystem2Mock.insert(db.getSession(), dto.setUuid(TASK_UUID_1));
@@ -105,7 +101,6 @@ public class CeQueueDaoTest {
assertThat(saved.getStatus()).isEqualTo(PENDING);
assertThat(saved.getSubmitterLogin()).isEqualTo(SUBMITTER_LOGIN);
assertThat(saved.getWorkerUuid()).isEqualTo(WORKER_UUID_1);
assertThat(saved.getExecutionCount()).isEqualTo(EXECUTION_COUNT);
assertThat(saved.getCreatedAt()).isEqualTo(now);
assertThat(saved.getUpdatedAt()).isEqualTo(now);
assertThat(saved.getStartedAt()).isNull();
@@ -117,7 +112,6 @@ public class CeQueueDaoTest {
assertThat(saved.getStatus()).isEqualTo(PENDING);
assertThat(saved.getSubmitterLogin()).isEqualTo(SUBMITTER_LOGIN);
assertThat(saved.getWorkerUuid()).isEqualTo(WORKER_UUID_1);
assertThat(saved.getExecutionCount()).isEqualTo(EXECUTION_COUNT);
assertThat(saved.getCreatedAt()).isEqualTo(6_888_777L);
assertThat(saved.getUpdatedAt()).isEqualTo(8_000_999L);
assertThat(saved.getStartedAt()).isNull();
@@ -135,7 +129,6 @@ public class CeQueueDaoTest {
assertThat(saved.getStatus()).isEqualTo(PENDING);
assertThat(saved.getSubmitterLogin()).isEqualTo("henri");
assertThat(saved.getWorkerUuid()).isNull();
assertThat(saved.getExecutionCount()).isEqualTo(0);
assertThat(saved.getCreatedAt()).isEqualTo(INIT_TIME);
assertThat(saved.getUpdatedAt()).isEqualTo(INIT_TIME);
assertThat(saved.getStartedAt()).isNull();
@@ -162,32 +155,17 @@ public class CeQueueDaoTest {
}

@Test
public void selectPendingByMinimumExecutionCount_returns_pending_tasks_with_executionCount_greater_or_equal_to_argument() {
insert("p1", CeQueueDto.Status.PENDING, 0);
insert("p2", CeQueueDto.Status.PENDING, 1);
insert("p3", CeQueueDto.Status.PENDING, 2);
insert("i1", CeQueueDto.Status.IN_PROGRESS, 0);
insert("i2", CeQueueDto.Status.IN_PROGRESS, 1);
insert("i3", CeQueueDto.Status.IN_PROGRESS, 2);
assertThat(underTest.selectPendingByMinimumExecutionCount(db.getSession(), 0))
public void selectPending_returns_pending_tasks() {
insert("p1", CeQueueDto.Status.PENDING);
insert("p2", CeQueueDto.Status.PENDING);
insert("p3", CeQueueDto.Status.PENDING);
insert("i1", CeQueueDto.Status.IN_PROGRESS);
insert("i2", CeQueueDto.Status.IN_PROGRESS);
insert("i3", CeQueueDto.Status.IN_PROGRESS);
assertThat(underTest.selectPending(db.getSession()))
.extracting(CeQueueDto::getUuid)
.containsOnly("p1", "p2", "p3");
assertThat(underTest.selectPendingByMinimumExecutionCount(db.getSession(), 1))
.extracting(CeQueueDto::getUuid)
.containsOnly("p2", "p3");
assertThat(underTest.selectPendingByMinimumExecutionCount(db.getSession(), 2))
.extracting(CeQueueDto::getUuid)
.containsOnly("p3");
assertThat(underTest.selectPendingByMinimumExecutionCount(db.getSession(), 3))
.isEmpty();
assertThat(underTest.selectPendingByMinimumExecutionCount(db.getSession(), 3 + Math.abs(new Random().nextInt(20))))
.isEmpty();
}

@Test
public void selectPendingByMinimumExecutionCount_does_not_return_non_pending_tasks() {

}

@Test
@@ -204,14 +182,14 @@ public class CeQueueDaoTest {
@Test
public void resetToPendingForWorker_resets_status_of_non_pending_tasks_only_for_specified_workerUuid() {
long startedAt = 2_099_888L;
CeQueueDto u1 = insert("u1", CeQueueDto.Status.IN_PROGRESS, 1, WORKER_UUID_1, startedAt);
CeQueueDto u2 = insert("u2", CeQueueDto.Status.PENDING, 1, WORKER_UUID_1, startedAt);
CeQueueDto u3 = insert("u3", CeQueueDto.Status.PENDING, 0, WORKER_UUID_1, startedAt);
CeQueueDto u4 = insert("u4", CeQueueDto.Status.IN_PROGRESS, 2, WORKER_UUID_1, startedAt);
CeQueueDto o1 = insert("o1", CeQueueDto.Status.IN_PROGRESS, 1, WORKER_UUID_2, startedAt);
CeQueueDto o2 = insert("o2", CeQueueDto.Status.PENDING, 1, WORKER_UUID_2, startedAt);
CeQueueDto o3 = insert("o3", CeQueueDto.Status.PENDING, 0, WORKER_UUID_2, startedAt);
CeQueueDto o4 = insert("o4", CeQueueDto.Status.IN_PROGRESS, 2, WORKER_UUID_2, startedAt);
CeQueueDto u1 = insert("u1", CeQueueDto.Status.IN_PROGRESS, WORKER_UUID_1, startedAt);
CeQueueDto u2 = insert("u2", CeQueueDto.Status.PENDING, WORKER_UUID_1, startedAt);
CeQueueDto u3 = insert("u3", CeQueueDto.Status.PENDING, WORKER_UUID_1, startedAt);
CeQueueDto u4 = insert("u4", CeQueueDto.Status.IN_PROGRESS, WORKER_UUID_1, startedAt);
CeQueueDto o1 = insert("o1", CeQueueDto.Status.IN_PROGRESS, WORKER_UUID_2, startedAt);
CeQueueDto o2 = insert("o2", CeQueueDto.Status.PENDING, WORKER_UUID_2, startedAt);
CeQueueDto o3 = insert("o3", CeQueueDto.Status.PENDING, WORKER_UUID_2, startedAt);
CeQueueDto o4 = insert("o4", CeQueueDto.Status.IN_PROGRESS, WORKER_UUID_2, startedAt);

underTestAlwaysIncreasingSystem2.resetToPendingForWorker(db.getSession(), WORKER_UUID_1);

@@ -228,14 +206,14 @@ public class CeQueueDaoTest {
@Test
public void resetTasksWithUnknownWorkerUUIDs_with_empty_set_resets_status_of_all_pending_tasks() {
long startedAt = 2_099_888L;
CeQueueDto u1 = insert("u1", CeQueueDto.Status.IN_PROGRESS, 1, WORKER_UUID_1, startedAt);
CeQueueDto u2 = insert("u2", CeQueueDto.Status.PENDING, 1, WORKER_UUID_1, startedAt);
CeQueueDto u3 = insert("u3", CeQueueDto.Status.PENDING, 0, WORKER_UUID_1, startedAt);
CeQueueDto u4 = insert("u4", CeQueueDto.Status.IN_PROGRESS, 2, WORKER_UUID_1, startedAt);
CeQueueDto o1 = insert("o1", CeQueueDto.Status.IN_PROGRESS, 1, WORKER_UUID_2, startedAt);
CeQueueDto o2 = insert("o2", CeQueueDto.Status.PENDING, 1, WORKER_UUID_2, startedAt);
CeQueueDto o3 = insert("o3", CeQueueDto.Status.PENDING, 0, WORKER_UUID_2, startedAt);
CeQueueDto o4 = insert("o4", CeQueueDto.Status.IN_PROGRESS, 2, WORKER_UUID_2, startedAt);
CeQueueDto u1 = insert("u1", CeQueueDto.Status.IN_PROGRESS, WORKER_UUID_1, startedAt);
CeQueueDto u2 = insert("u2", CeQueueDto.Status.PENDING, WORKER_UUID_1, startedAt);
CeQueueDto u3 = insert("u3", CeQueueDto.Status.PENDING, WORKER_UUID_1, startedAt);
CeQueueDto u4 = insert("u4", CeQueueDto.Status.IN_PROGRESS, WORKER_UUID_1, startedAt);
CeQueueDto o1 = insert("o1", CeQueueDto.Status.IN_PROGRESS, WORKER_UUID_2, startedAt);
CeQueueDto o2 = insert("o2", CeQueueDto.Status.PENDING, WORKER_UUID_2, startedAt);
CeQueueDto o3 = insert("o3", CeQueueDto.Status.PENDING, WORKER_UUID_2, startedAt);
CeQueueDto o4 = insert("o4", CeQueueDto.Status.IN_PROGRESS, WORKER_UUID_2, startedAt);

underTestAlwaysIncreasingSystem2.resetTasksWithUnknownWorkerUUIDs(db.getSession(), ImmutableSet.of());

@@ -252,14 +230,14 @@ public class CeQueueDaoTest {
@Test
public void resetTasksWithUnknownWorkerUUIDs_set_resets_status_of_all_pending_tasks_with_unknown_workers() {
long startedAt = 2_099_888L;
CeQueueDto u1 = insert("u1", CeQueueDto.Status.IN_PROGRESS, 1, WORKER_UUID_1, startedAt);
CeQueueDto u2 = insert("u2", CeQueueDto.Status.PENDING, 1, WORKER_UUID_1, startedAt);
CeQueueDto u3 = insert("u3", CeQueueDto.Status.PENDING, 0, WORKER_UUID_1, startedAt);
CeQueueDto u4 = insert("u4", CeQueueDto.Status.IN_PROGRESS, 2, WORKER_UUID_1, startedAt);
CeQueueDto o1 = insert("o1", CeQueueDto.Status.IN_PROGRESS, 1, WORKER_UUID_2, startedAt);
CeQueueDto o2 = insert("o2", CeQueueDto.Status.PENDING, 1, WORKER_UUID_2, startedAt);
CeQueueDto o3 = insert("o3", CeQueueDto.Status.PENDING, 0, WORKER_UUID_2, startedAt);
CeQueueDto o4 = insert("o4", CeQueueDto.Status.IN_PROGRESS, 2, WORKER_UUID_2, startedAt);
CeQueueDto u1 = insert("u1", CeQueueDto.Status.IN_PROGRESS, WORKER_UUID_1, startedAt);
CeQueueDto u2 = insert("u2", CeQueueDto.Status.PENDING, WORKER_UUID_1, startedAt);
CeQueueDto u3 = insert("u3", CeQueueDto.Status.PENDING, WORKER_UUID_1, startedAt);
CeQueueDto u4 = insert("u4", CeQueueDto.Status.IN_PROGRESS, WORKER_UUID_1, startedAt);
CeQueueDto o1 = insert("o1", CeQueueDto.Status.IN_PROGRESS, WORKER_UUID_2, startedAt);
CeQueueDto o2 = insert("o2", CeQueueDto.Status.PENDING, WORKER_UUID_2, startedAt);
CeQueueDto o3 = insert("o3", CeQueueDto.Status.PENDING, WORKER_UUID_2, startedAt);
CeQueueDto o4 = insert("o4", CeQueueDto.Status.IN_PROGRESS, WORKER_UUID_2, startedAt);

underTestAlwaysIncreasingSystem2.resetTasksWithUnknownWorkerUUIDs(db.getSession(), ImmutableSet.of(WORKER_UUID_1, "unknown"));

@@ -276,7 +254,6 @@ public class CeQueueDaoTest {
private void verifyResetByResetTasks(CeQueueDto original) {
CeQueueDto dto = db.getDbClient().ceQueueDao().selectByUuid(db.getSession(), original.getUuid()).get();
assertThat(dto.getStatus()).isEqualTo(CeQueueDto.Status.PENDING).isNotEqualTo(original.getStatus());
assertThat(dto.getExecutionCount()).isEqualTo(original.getExecutionCount());
assertThat(dto.getStartedAt()).isNull();
assertThat(dto.getCreatedAt()).isEqualTo(original.getCreatedAt());
assertThat(dto.getUpdatedAt()).isGreaterThan(original.getUpdatedAt());
@@ -286,7 +263,6 @@ public class CeQueueDaoTest {
private void verifyResetToPendingForWorker(CeQueueDto original) {
CeQueueDto dto = db.getDbClient().ceQueueDao().selectByUuid(db.getSession(), original.getUuid()).get();
assertThat(dto.getStatus()).isEqualTo(CeQueueDto.Status.PENDING);
assertThat(dto.getExecutionCount()).isEqualTo(original.getExecutionCount());
assertThat(dto.getStartedAt()).isNull();
assertThat(dto.getCreatedAt()).isEqualTo(original.getCreatedAt());
assertThat(dto.getUpdatedAt()).isGreaterThan(original.getUpdatedAt());
@@ -296,7 +272,6 @@ public class CeQueueDaoTest {
private void verifyUnchangedByResetToPendingForWorker(CeQueueDto original) {
CeQueueDto dto = db.getDbClient().ceQueueDao().selectByUuid(db.getSession(), original.getUuid()).get();
assertThat(dto.getStatus()).isEqualTo(original.getStatus());
assertThat(dto.getExecutionCount()).isEqualTo(original.getExecutionCount());
assertThat(dto.getStartedAt()).isEqualTo(original.getStartedAt());
assertThat(dto.getCreatedAt()).isEqualTo(original.getCreatedAt());
assertThat(dto.getUpdatedAt()).isEqualTo(original.getUpdatedAt());
@@ -305,11 +280,11 @@ public class CeQueueDaoTest {

@Test
public void peek_none_if_no_pendings() {
assertThat(underTest.peek(db.getSession(), WORKER_UUID_1, MAX_EXECUTION_COUNT).isPresent()).isFalse();
assertThat(underTest.peek(db.getSession(), WORKER_UUID_1).isPresent()).isFalse();

// not pending, but in progress
insert(TASK_UUID_1, COMPONENT_UUID_1, IN_PROGRESS);
assertThat(underTest.peek(db.getSession(), WORKER_UUID_1, MAX_EXECUTION_COUNT).isPresent()).isFalse();
assertThat(underTest.peek(db.getSession(), WORKER_UUID_1).isPresent()).isFalse();
}

@Test
@@ -322,25 +297,23 @@ public class CeQueueDaoTest {
verifyCeQueueStatuses(TASK_UUID_1, PENDING, TASK_UUID_2, PENDING);

// peek first one
Optional<CeQueueDto> peek = underTest.peek(db.getSession(), WORKER_UUID_1, MAX_EXECUTION_COUNT);
Optional<CeQueueDto> peek = underTest.peek(db.getSession(), WORKER_UUID_1);
assertThat(peek).isPresent();
assertThat(peek.get().getUuid()).isEqualTo(TASK_UUID_1);
assertThat(peek.get().getStatus()).isEqualTo(IN_PROGRESS);
assertThat(peek.get().getWorkerUuid()).isEqualTo(WORKER_UUID_1);
assertThat(peek.get().getExecutionCount()).isEqualTo(1);
verifyCeQueueStatuses(TASK_UUID_1, IN_PROGRESS, TASK_UUID_2, PENDING);

// peek second one
peek = underTest.peek(db.getSession(), WORKER_UUID_2, MAX_EXECUTION_COUNT);
peek = underTest.peek(db.getSession(), WORKER_UUID_2);
assertThat(peek).isPresent();
assertThat(peek.get().getUuid()).isEqualTo(TASK_UUID_2);
assertThat(peek.get().getStatus()).isEqualTo(IN_PROGRESS);
assertThat(peek.get().getWorkerUuid()).isEqualTo(WORKER_UUID_2);
assertThat(peek.get().getExecutionCount()).isEqualTo(1);
verifyCeQueueStatuses(TASK_UUID_1, IN_PROGRESS, TASK_UUID_2, IN_PROGRESS);

// no more pendings
assertThat(underTest.peek(db.getSession(), WORKER_UUID_1, MAX_EXECUTION_COUNT).isPresent()).isFalse();
assertThat(underTest.peek(db.getSession(), WORKER_UUID_1).isPresent()).isFalse();
}

@Test
@@ -350,67 +323,21 @@ public class CeQueueDaoTest {
system2.setNow(INIT_TIME + 3_000_000);
insert(TASK_UUID_2, COMPONENT_UUID_1, PENDING);

Optional<CeQueueDto> peek = underTest.peek(db.getSession(), WORKER_UUID_1, MAX_EXECUTION_COUNT);
Optional<CeQueueDto> peek = underTest.peek(db.getSession(), WORKER_UUID_1);
assertThat(peek).isPresent();
assertThat(peek.get().getUuid()).isEqualTo(TASK_UUID_1);
assertThat(peek.get().getWorkerUuid()).isEqualTo(WORKER_UUID_1);
assertThat(peek.get().getExecutionCount()).isEqualTo(1);
verifyCeQueueStatuses(TASK_UUID_1, IN_PROGRESS, TASK_UUID_2, PENDING);

// do not peek second task as long as the first one is in progress
peek = underTest.peek(db.getSession(), WORKER_UUID_1, MAX_EXECUTION_COUNT);
peek = underTest.peek(db.getSession(), WORKER_UUID_1);
assertThat(peek.isPresent()).isFalse();

// first one is finished
underTest.deleteByUuid(db.getSession(), TASK_UUID_1);
peek = underTest.peek(db.getSession(), WORKER_UUID_2, MAX_EXECUTION_COUNT);
peek = underTest.peek(db.getSession(), WORKER_UUID_2);
assertThat(peek.get().getUuid()).isEqualTo(TASK_UUID_2);
assertThat(peek.get().getWorkerUuid()).isEqualTo(WORKER_UUID_2);
assertThat(peek.get().getExecutionCount()).isEqualTo(1);
}

@Test
public void peek_ignores_rows_with_executionCount_greater_or_equal_to_specified_maxExecutionCount_0() {
peek_ignores_rows_with_executionCount_greater_or_equal_to_specified_maxExecutionCount(0, null);
}

@Test
public void peek_ignores_rows_with_executionCount_greater_or_equal_to_specified_maxExecutionCount_1() {
peek_ignores_rows_with_executionCount_greater_or_equal_to_specified_maxExecutionCount(1, "u0");
}

@Test
public void peek_ignores_rows_with_executionCount_greater_or_equal_to_specified_maxExecutionCount_2() {
peek_ignores_rows_with_executionCount_greater_or_equal_to_specified_maxExecutionCount(2, "u1");
}

@Test
public void peek_ignores_rows_with_executionCount_greater_or_equal_to_specified_maxExecutionCount_3() {
peek_ignores_rows_with_executionCount_greater_or_equal_to_specified_maxExecutionCount(3, "u2");
}

@Test
public void peek_ignores_rows_with_executionCount_greater_or_equal_to_specified_maxExecutionCount_4() {
peek_ignores_rows_with_executionCount_greater_or_equal_to_specified_maxExecutionCount(4, "u3");
}

@Test
public void peek_ignores_rows_with_executionCount_greater_or_equal_to_specified_maxExecutionCount_more_then_4() {
peek_ignores_rows_with_executionCount_greater_or_equal_to_specified_maxExecutionCount(4 + Math.abs(new Random().nextInt(100)), "u3");
}

private void peek_ignores_rows_with_executionCount_greater_or_equal_to_specified_maxExecutionCount(int maxExecutionCount, @Nullable String expected) {
insert("u3", CeQueueDto.Status.PENDING, 3);
insert("u2", CeQueueDto.Status.PENDING, 2);
insert("u1", CeQueueDto.Status.PENDING, 1);
insert("u0", CeQueueDto.Status.PENDING, 0);

Optional<CeQueueDto> dto = underTest.peek(db.getSession(), WORKER_UUID_1, maxExecutionCount);
if (expected == null) {
assertThat(dto.isPresent()).isFalse();
} else {
assertThat(dto.get().getUuid()).isEqualTo(expected);
}
}

@Test
@@ -575,25 +502,23 @@ public class CeQueueDaoTest {
db.commit();
}

private CeQueueDto insert(String uuid, CeQueueDto.Status status, int executionCount) {
private CeQueueDto insert(String uuid, CeQueueDto.Status status) {
CeQueueDto dto = new CeQueueDto();
dto.setUuid(uuid);
dto.setTaskType(CeTaskTypes.REPORT);
dto.setStatus(status);
dto.setSubmitterLogin("henri");
dto.setExecutionCount(executionCount);
underTestAlwaysIncreasingSystem2.insert(db.getSession(), dto);
db.getSession().commit();
return dto;
}

private CeQueueDto insert(String uuid, CeQueueDto.Status status, int executionCount, String workerUuid, Long startedAt) {
private CeQueueDto insert(String uuid, CeQueueDto.Status status, String workerUuid, Long startedAt) {
CeQueueDto dto = new CeQueueDto();
dto.setUuid(uuid);
dto.setTaskType(CeTaskTypes.REPORT);
dto.setStatus(status);
dto.setSubmitterLogin("henri");
dto.setExecutionCount(executionCount);
dto.setWorkerUuid(workerUuid);
dto.setStartedAt(startedAt);
underTestAlwaysIncreasingSystem2.insert(db.getSession(), dto);
@@ -627,11 +552,6 @@ public class CeQueueDaoTest {
});
}

private void verifyCeQueueStatuses(String taskUuid1, CeQueueDto.Status taskStatus1, String taskUuid2, CeQueueDto.Status taskStatus2, String taskUuid3,
CeQueueDto.Status taskStatus3) {
verifyCeQueueStatuses(new String[] {taskUuid1, taskUuid2, taskUuid3}, new CeQueueDto.Status[] {taskStatus1, taskStatus2, taskStatus3});
}

private void verifyCeQueueStatuses(String taskUuid1, CeQueueDto.Status taskStatus1, String taskUuid2, CeQueueDto.Status taskStatus2) {
verifyCeQueueStatuses(new String[] {taskUuid1, taskUuid2}, new CeQueueDto.Status[] {taskStatus1, taskStatus2});
}

+ 0
- 11
server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeQueueDtoTest.java View File

@@ -19,7 +19,6 @@
*/
package org.sonar.db.ce;

import java.util.Random;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
@@ -112,14 +111,4 @@ public class CeQueueDtoTest {

underTest.setWorkerUuid(str_41_chars);
}

@Test
public void setExecutionCount_throws_IAE_if_value_is_less_than_0() {
int lessThanZero = -1-(Math.abs(new Random().nextInt()));

expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("execution count can't be < 0");

underTest.setExecutionCount(lessThanZero);
}
}

+ 0
- 1
server/sonar-db-dao/src/test/java/org/sonar/db/purge/PurgeDaoTest.java View File

@@ -802,7 +802,6 @@ public class PurgeDaoTest {
.setTaskType("foo")
.setComponentUuid(project.uuid())
.setStatus(Status.PENDING)
.setExecutionCount(0)
.setCreatedAt(1_2323_222L)
.setUpdatedAt(1_2323_222L);
dbClient.ceQueueDao().insert(dbSession, res);

+ 4
- 5
server/sonar-server/src/test/java/org/sonar/ce/queue/CeQueueImplTest.java View File

@@ -53,7 +53,6 @@ import static org.sonar.ce.queue.CeQueue.SubmitOption.UNIQUE_QUEUE_PER_COMPONENT
public class CeQueueImplTest {

private static final String WORKER_UUID = "workerUuid";
private static final int MAX_EXECUTION_COUNT = 3;

private System2 system2 = new TestSystem2().setNow(1_450_000_000_000L);

@@ -356,7 +355,7 @@ public class CeQueueImplTest {
@Test
public void fail_to_cancel_if_in_progress() {
submit(CeTaskTypes.REPORT, "PROJECT_1");
CeQueueDto ceQueueDto = db.getDbClient().ceQueueDao().peek(session, WORKER_UUID, MAX_EXECUTION_COUNT).get();
CeQueueDto ceQueueDto = db.getDbClient().ceQueueDao().peek(session, WORKER_UUID).get();

expectedException.expect(IllegalStateException.class);
expectedException.expectMessage(startsWith("Task is in progress and can't be canceled"));
@@ -370,7 +369,7 @@ public class CeQueueImplTest {
CeTask pendingTask1 = submit(CeTaskTypes.REPORT, "PROJECT_2");
CeTask pendingTask2 = submit(CeTaskTypes.REPORT, "PROJECT_3");

db.getDbClient().ceQueueDao().peek(session, WORKER_UUID, MAX_EXECUTION_COUNT);
db.getDbClient().ceQueueDao().peek(session, WORKER_UUID);

int canceledCount = underTest.cancelAll();
assertThat(canceledCount).isEqualTo(2);
@@ -397,7 +396,7 @@ public class CeQueueImplTest {
@Test
public void pauseWorkers_marks_workers_as_pausing_if_some_tasks_in_progress() {
submit(CeTaskTypes.REPORT, "PROJECT_1");
db.getDbClient().ceQueueDao().peek(session, WORKER_UUID, MAX_EXECUTION_COUNT);
db.getDbClient().ceQueueDao().peek(session, WORKER_UUID);
// task is in-progress

assertThat(underTest.getWorkersPauseStatus()).isEqualTo(CeQueue.WorkersPauseStatus.RESUMED);
@@ -418,7 +417,7 @@ public class CeQueueImplTest {
@Test
public void resumeWorkers_resumes_pausing_workers() {
submit(CeTaskTypes.REPORT, "PROJECT_1");
db.getDbClient().ceQueueDao().peek(session, WORKER_UUID, MAX_EXECUTION_COUNT);
db.getDbClient().ceQueueDao().peek(session, WORKER_UUID);
// task is in-progress

underTest.pauseWorkers();

Loading…
Cancel
Save