import java.util.Set;
import javax.annotation.CheckForNull;
import javax.annotation.Nullable;
+
import org.sonar.api.ce.ComputeEngineSide;
import org.sonar.api.utils.System2;
import org.sonar.api.utils.log.Logger;
import org.sonar.db.DbClient;
import org.sonar.db.DbSession;
import org.sonar.db.ce.CeActivityDto;
-import org.sonar.db.ce.CeQueueDao;
import org.sonar.db.ce.CeQueueDto;
import org.sonar.db.ce.CeTaskCharacteristicDto;
import org.sonar.db.component.ComponentDto;
return Optional.empty();
}
try (DbSession dbSession = dbClient.openSession(false)) {
- CeQueueDao ceQueueDao = dbClient.ceQueueDao();
- int i = ceQueueDao.resetToPendingForWorker(dbSession, workerUuid);
- if (i > 0) {
- dbSession.commit();
- LOG.debug("{} in progress tasks reset for worker uuid {}", i, workerUuid);
- }
+ resetNotPendingTasks(workerUuid, dbSession);
Optional<CeQueueDto> opt = nextPendingTaskPicker.findPendingTask(workerUuid, dbSession, excludeIndexationJob);
if (opt.isEmpty()) {
return Optional.empty();
}
}
+ private void resetNotPendingTasks(String workerUuid, DbSession dbSession) {
+ List<CeQueueDto> notPendingTasks = dbClient.ceQueueDao().selectNotPendingForWorker(dbSession, workerUuid);
+ if (!notPendingTasks.isEmpty()) {
+ for (CeQueueDto pendingTask : notPendingTasks) {
+ dbClient.ceQueueDao().resetToPendingByUuid(dbSession, pendingTask.getUuid());
+ }
+ dbSession.commit();
+ LOG.debug("{} in progress tasks reset for worker uuid {}", notPendingTasks.size(), workerUuid);
+ }
+ }
+
@Override
public void remove(CeTask task, CeActivityDto.Status status, @Nullable CeTaskResult taskResult, @Nullable Throwable error) {
checkArgument(error == null || status == CeActivityDto.Status.FAILED, "Error can be provided only when status is FAILED");
private static final String WORKER_UUID_1 = "worker uuid 1";
private static final String WORKER_UUID_2 = "worker uuid 2";
- private TestSystem2 system2 = new TestSystem2().setNow(INIT_TIME);
+ private final TestSystem2 system2 = new TestSystem2().setNow(INIT_TIME);
@Rule
public DbTester db = DbTester.create(system2);
- private System2 mockedSystem2 = mock(System2.class);
- private System2 alwaysIncreasingSystem2 = new AlwaysIncreasingSystem2();
+ private final System2 mockedSystem2 = mock(System2.class);
+ private final System2 alwaysIncreasingSystem2 = new AlwaysIncreasingSystem2();
- private CeQueueDao underTest = new CeQueueDao(system2);
- private CeQueueDao underTestWithSystem2Mock = new CeQueueDao(mockedSystem2);
- private CeQueueDao underTestAlwaysIncreasingSystem2 = new CeQueueDao(alwaysIncreasingSystem2);
+ private final CeQueueDao underTest = new CeQueueDao(system2);
+ private final CeQueueDao underTestWithSystem2Mock = new CeQueueDao(mockedSystem2);
+ private final CeQueueDao underTestAlwaysIncreasingSystem2 = new CeQueueDao(alwaysIncreasingSystem2);
@Test
public void insert_populates_createdAt_and_updateAt_from_System2_with_same_value_if_any_is_not_set() {
@Test
public void test_delete_with_expected_status() {
insertPending(TASK_UUID_1, MAIN_COMPONENT_UUID_1);
- insertInProgress(TASK_UUID_2);
+ insertInProgress(TASK_UUID_2, "workerUuid", System2.INSTANCE.now());
int deletedCount = underTest.deleteByUuid(db.getSession(), "UNKNOWN", null);
assertThat(deletedCount).isZero();
assertThat(underTest.selectByUuid(db.getSession(), TASK_UUID_2)).isEmpty();
}
+ @Test
+ public void selectNotPendingForWorker_return_non_pending_tasks_for_specified_workerUuid() {
+ long startedAt = alwaysIncreasingSystem2.now();
+ insertPending("u1");
+ CeQueueDto inProgressTaskWorker1 = insertInProgress("u2", WORKER_UUID_1, startedAt);
+ insertInProgress("o2", WORKER_UUID_2, startedAt);
+
+ List<CeQueueDto> notPendingForWorker = underTestAlwaysIncreasingSystem2.selectNotPendingForWorker(db.getSession(), WORKER_UUID_1);
+
+ assertThat(notPendingForWorker).extracting(CeQueueDto::getUuid)
+ .contains(inProgressTaskWorker1.getUuid());
+ }
+
+ @Test
+ public void resetToPendingByUuid_resets_status_of_specific_task() {
+ long task1startedAt = alwaysIncreasingSystem2.now();
+ CeQueueDto task1 = insertInProgress("uuid-1", "workerUuid", task1startedAt);
+ CeQueueDto task2 = insertInProgress("uuid-2", "workerUuid", alwaysIncreasingSystem2.now());
+
+ underTestAlwaysIncreasingSystem2.resetToPendingByUuid(db.getSession(), task1.getUuid());
+
+ verifyResetToPendingForWorker(task1, task1.getWorkerUuid(), task1startedAt);
+ verifyUnchangedByResetToPendingForWorker(task2);
+ }
+
@Test
public void resetToPendingForWorker_resets_status_of_non_pending_tasks_only_for_specified_workerUuid() {
CeQueueDto[] worker1 = {insertPending("u1"), insertPending("u2"), insertPending("u3"), insertPending("u4")};
makeInProgress(WORKER_UUID_2, startedAt, worker2[0]);
makeInProgress(WORKER_UUID_2, startedAt, worker2[3]);
- underTestAlwaysIncreasingSystem2.resetToPendingForWorker(db.getSession(), WORKER_UUID_1);
+ List<CeQueueDto> notPendingForWorker = underTestAlwaysIncreasingSystem2.selectNotPendingForWorker(db.getSession(), WORKER_UUID_1);
+ for (CeQueueDto ceQueueDto : notPendingForWorker) {
+ underTestAlwaysIncreasingSystem2.resetToPendingByUuid(db.getSession(), ceQueueDto.getUuid());
+ }
verifyResetToPendingForWorker(worker1[0], WORKER_UUID_1, startedAt);
verifyUnchangedByResetToPendingForWorker(worker1[1]);
return dto;
}
- private CeQueueDto insertInProgress(String uuid) {
- CeQueueDto ceQueueDto = insertPending(uuid);
- CeQueueTesting.makeInProgress(db.getSession(), "workerUuid", System2.INSTANCE.now(), ceQueueDto);
- return underTest.selectByUuid(db.getSession(), uuid).get();
+ private CeQueueDto insertInProgress(String taskUuid, String workerUuid, long now) {
+ CeQueueDto ceQueueDto = insertPending(taskUuid);
+ CeQueueTesting.makeInProgress(db.getSession(), workerUuid, now, ceQueueDto);
+ return underTest.selectByUuid(db.getSession(), taskUuid).get();
}
private void insertCharacteristic(String key, String value, String uuid, String taskUuid) {