aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--server/sonar-ce-common/src/main/java/org/sonar/ce/queue/CeQueueImpl.java3
-rw-r--r--server/sonar-ce/src/main/java/org/sonar/ce/cleaning/CeCleaningSchedulerImpl.java11
-rw-r--r--server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueue.java2
-rw-r--r--server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java15
-rw-r--r--server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java120
-rw-r--r--server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueDao.java4
-rw-r--r--server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueDto.java13
-rw-r--r--server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueMapper.java5
-rw-r--r--server/sonar-db-dao/src/main/java/org/sonar/db/ce/UpdateIf.java5
-rw-r--r--server/sonar-db-dao/src/main/resources/org/sonar/db/ce/CeQueueMapper.xml18
-rw-r--r--server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeActivityDaoTest.java29
-rw-r--r--server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeQueueDaoTest.java243
-rw-r--r--server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeQueueDtoTest.java18
-rw-r--r--server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeQueueTesting.java31
-rw-r--r--server/sonar-db-dao/src/test/java/org/sonar/db/ce/UpdateIfTest.java91
-rw-r--r--server/sonar-server/src/test/java/org/sonar/server/ce/ws/TaskFormatterTest.java17
16 files changed, 376 insertions, 249 deletions
diff --git a/server/sonar-ce-common/src/main/java/org/sonar/ce/queue/CeQueueImpl.java b/server/sonar-ce-common/src/main/java/org/sonar/ce/queue/CeQueueImpl.java
index 01b113bd64d..34a8c01f677 100644
--- a/server/sonar-ce-common/src/main/java/org/sonar/ce/queue/CeQueueImpl.java
+++ b/server/sonar-ce-common/src/main/java/org/sonar/ce/queue/CeQueueImpl.java
@@ -154,12 +154,11 @@ public class CeQueueImpl implements CeQueue {
dto.setComponentUuid(submission.getComponentUuid());
dto.setStatus(PENDING);
dto.setSubmitterUuid(submission.getSubmitterUuid());
- dto.setStartedAt(null);
dbClient.ceQueueDao().insert(dbSession, dto);
return dto;
}
- protected CeTask loadTask(DbSession dbSession, CeQueueDto dto) {
+ CeTask loadTask(DbSession dbSession, CeQueueDto dto) {
String componentUuid = dto.getComponentUuid();
if (componentUuid == null) {
return new CeQueueDtoToCeTask(defaultOrganizationProvider.get().getUuid()).apply(dto);
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/cleaning/CeCleaningSchedulerImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/cleaning/CeCleaningSchedulerImpl.java
index 6c34cac9baf..b266d4a94ad 100644
--- a/server/sonar-ce/src/main/java/org/sonar/ce/cleaning/CeCleaningSchedulerImpl.java
+++ b/server/sonar-ce/src/main/java/org/sonar/ce/cleaning/CeCleaningSchedulerImpl.java
@@ -56,9 +56,11 @@ public class CeCleaningSchedulerImpl implements CeCleaningScheduler {
Lock ceCleaningJobLock = ceDistributedInformation.acquireCleanJobLock();
// If we cannot lock that means that another job is running
+ // So we skip resetting and cancelling tasks in queue
if (ceCleaningJobLock.tryLock()) {
try {
resetTasksWithUnknownWorkerUUIDs();
+ cancelWornOuts();
} finally {
ceCleaningJobLock.unlock();
}
@@ -73,4 +75,13 @@ public class CeCleaningSchedulerImpl implements CeCleaningScheduler {
LOG.warn("Failed to reset tasks with unknown worker UUIDs", e);
}
}
+
+ private void cancelWornOuts() {
+ try {
+ LOG.debug("Cancelling any worn out task");
+ internalCeQueue.cancelWornOuts();
+ } catch (Exception e) {
+ LOG.warn("Failed to cancel worn out tasks", e);
+ }
+ }
}
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueue.java b/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueue.java
index 9932e2e3986..255e94d7011 100644
--- a/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueue.java
+++ b/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueue.java
@@ -68,5 +68,7 @@ public interface InternalCeQueue extends CeQueue {
*/
void remove(CeTask task, Status status, @Nullable CeTaskResult taskResult, @Nullable Throwable error);
+ void cancelWornOuts();
+
void resetTasksWithUnknownWorkerUUIDs(Set<String> knownWorkerUUIDs);
}
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java
index 8f9b6eb34a5..743d71336e8 100644
--- a/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java
+++ b/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java
@@ -23,6 +23,7 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintWriter;
+import java.util.List;
import java.util.Optional;
import java.util.Set;
import javax.annotation.CheckForNull;
@@ -78,6 +79,7 @@ public class InternalCeQueueImpl extends CeQueueImpl implements InternalCeQueue
CeQueueDao ceQueueDao = dbClient.ceQueueDao();
int i = ceQueueDao.resetToPendingForWorker(dbSession, workerUuid);
if (i > 0) {
+ dbSession.commit();
LOG.debug("{} in progress tasks reset for worker uuid {}", i, workerUuid);
}
Optional<CeQueueDto> dto = ceQueueDao.peek(dbSession, workerUuid);
@@ -165,6 +167,19 @@ public class InternalCeQueueImpl extends CeQueueImpl implements InternalCeQueue
}
@Override
+ public void cancelWornOuts() {
+ try (DbSession dbSession = dbClient.openSession(false)) {
+ List<CeQueueDto> wornOutTasks = dbClient.ceQueueDao().selectWornout(dbSession);
+ wornOutTasks.forEach(queueDto -> {
+ CeActivityDto activityDto = new CeActivityDto(queueDto);
+ activityDto.setStatus(CeActivityDto.Status.CANCELED);
+ updateQueueStatus(CeActivityDto.Status.CANCELED, activityDto);
+ remove(dbSession, queueDto, activityDto);
+ });
+ }
+ }
+
+ @Override
public void resetTasksWithUnknownWorkerUUIDs(Set<String> knownWorkerUUIDs) {
try (DbSession dbSession = dbClient.openSession(false)) {
dbClient.ceQueueDao().resetTasksWithUnknownWorkerUUIDs(dbSession, knownWorkerUUIDs);
diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java
index ee5680b24b7..74c38b92df5 100644
--- a/server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java
+++ b/server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java
@@ -43,6 +43,7 @@ import org.sonar.db.DbSession;
import org.sonar.db.DbTester;
import org.sonar.db.ce.CeActivityDto;
import org.sonar.db.ce.CeQueueDto;
+import org.sonar.db.ce.CeQueueTesting;
import org.sonar.db.ce.CeTaskTypes;
import org.sonar.db.component.ComponentDto;
import org.sonar.db.component.ComponentTesting;
@@ -259,11 +260,11 @@ public class InternalCeQueueImplTest {
@Test
public void remove_copies_workerUuid() {
- db.getDbClient().ceQueueDao().insert(session, new CeQueueDto()
+ CeQueueDto ceQueueDto = db.getDbClient().ceQueueDao().insert(session, new CeQueueDto()
.setUuid("uuid")
.setTaskType("foo")
- .setStatus(CeQueueDto.Status.PENDING)
- .setWorkerUuid("Dustin"));
+ .setStatus(CeQueueDto.Status.PENDING));
+ makeInProgress(ceQueueDto, "Dustin");
db.commit();
underTest.remove(new CeTask.Builder()
@@ -316,18 +317,15 @@ public class InternalCeQueueImplTest {
}
@Test
- public void peek_overrides_workerUuid_to_argument() {
- db.getDbClient().ceQueueDao().insert(session, new CeQueueDto()
+ public void peek_ignores_in_progress_tasks() {
+ CeQueueDto dto = db.getDbClient().ceQueueDao().insert(session, new CeQueueDto()
.setUuid("uuid")
.setTaskType("foo")
- .setStatus(CeQueueDto.Status.PENDING)
- .setWorkerUuid("must be overriden"));
+ .setStatus(CeQueueDto.Status.PENDING));
+ makeInProgress(dto, "foo");
db.commit();
- underTest.peek(WORKER_UUID_1);
-
- CeQueueDto ceQueueDto = db.getDbClient().ceQueueDao().selectByUuid(session, "uuid").get();
- assertThat(ceQueueDto.getWorkerUuid()).isEqualTo(WORKER_UUID_1);
+ assertThat(underTest.peek(WORKER_UUID_1)).isEmpty();
}
@Test
@@ -352,8 +350,8 @@ public class InternalCeQueueImplTest {
@Test
public void peek_resets_to_pending_any_task_in_progress_for_specified_worker_uuid_and_updates_updatedAt() {
- insertPending("u0", "doesn't matter"); // add a pending one that will be picked so that u1 isn't peek and status reset is visible in DB
- CeQueueDto u1 = insertPending("u1", WORKER_UUID_1);// will be picked-because older than any of the reset ones
+ insertPending("u0"); // add a pending one that will be picked so that u1 isn't peek and status reset is visible in DB
+ CeQueueDto u1 = insertPending("u1");// will be picked-because older than any of the reset ones
CeQueueDto u2 = insertInProgress("u2", WORKER_UUID_1);// will be reset
assertThat(underTest.peek(WORKER_UUID_1).get().getUuid()).isEqualTo("u0");
@@ -364,7 +362,7 @@ public class InternalCeQueueImplTest {
@Test
public void peek_resets_to_pending_any_task_in_progress_for_specified_worker_uuid_and_only_this_uuid() {
- insertPending("u0", "doesn't matter"); // add a pending one that will be picked so that u1 isn't peek and status reset is visible in DB
+ insertPending("u0"); // add a pending one that will be picked so that u1 isn't peek and status reset is visible in DB
CeQueueDto u1 = insertInProgress("u1", WORKER_UUID_1);
CeQueueDto u2 = insertInProgress("u2", WORKER_UUID_2);
CeQueueDto u3 = insertInProgress("u3", WORKER_UUID_1);
@@ -396,19 +394,18 @@ public class InternalCeQueueImplTest {
CeQueueDto dto = new CeQueueDto()
.setUuid(uuid)
.setTaskType("foo")
- .setStatus(CeQueueDto.Status.IN_PROGRESS)
- .setWorkerUuid(workerUuid);
+ .setStatus(CeQueueDto.Status.PENDING);
db.getDbClient().ceQueueDao().insert(session, dto);
+ makeInProgress(dto, workerUuid);
db.commit();
- return dto;
+ return db.getDbClient().ceQueueDao().selectByUuid(session, uuid).get();
}
- private CeQueueDto insertPending(String uuid, String workerUuid) {
+ private CeQueueDto insertPending(String uuid) {
CeQueueDto dto = new CeQueueDto()
.setUuid(uuid)
.setTaskType("foo")
- .setStatus(CeQueueDto.Status.PENDING)
- .setWorkerUuid(workerUuid);
+ .setStatus(CeQueueDto.Status.PENDING);
db.getDbClient().ceQueueDao().insert(session, dto);
db.commit();
return dto;
@@ -427,21 +424,6 @@ public class InternalCeQueueImplTest {
}
@Test
- public void cancel_copies_workerUuid() {
- CeQueueDto ceQueueDto = db.getDbClient().ceQueueDao().insert(session, new CeQueueDto()
- .setUuid("uuid")
- .setTaskType("foo")
- .setStatus(CeQueueDto.Status.PENDING)
- .setWorkerUuid("Dustin"));
- db.commit();
-
- underTest.cancel(db.getSession(), ceQueueDto);
-
- CeActivityDto dto = db.getDbClient().ceActivityDao().selectByUuid(db.getSession(), "uuid").get();
- assertThat(dto.getWorkerUuid()).isEqualTo("Dustin");
- }
-
- @Test
public void fail_to_cancel_if_in_progress() {
CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1");
underTest.peek(WORKER_UUID_2);
@@ -473,25 +455,19 @@ public class InternalCeQueueImplTest {
@Test
public void resetTasksWithUnknownWorkerUUIDs_reset_only_in_progress_tasks() {
- CeQueueDto u1 = insertCeQueueDto("u1", CeQueueDto.Status.PENDING, null);
- CeQueueDto u2 = insertCeQueueDto("u2", CeQueueDto.Status.PENDING, "worker1");
- CeQueueDto u3 = insertCeQueueDto("u3", CeQueueDto.Status.PENDING, null);
- CeQueueDto u4 = insertCeQueueDto("u4", CeQueueDto.Status.PENDING, "worker2");
- CeQueueDto u5 = insertCeQueueDto("u5", CeQueueDto.Status.IN_PROGRESS, null);
- CeQueueDto u6 = insertCeQueueDto("u6", CeQueueDto.Status.IN_PROGRESS, "worker1");
- CeQueueDto u7 = insertCeQueueDto("u7", CeQueueDto.Status.IN_PROGRESS, "worker2");
- CeQueueDto u8 = insertCeQueueDto("u8", CeQueueDto.Status.IN_PROGRESS, "worker3");
+ CeQueueDto u1 = insertCeQueueDto("u1");
+ CeQueueDto u2 = insertCeQueueDto("u2");
+ CeQueueDto u6 = insertInProgress("u6", "worker1");
+ CeQueueDto u7 = insertInProgress("u7", "worker2");
+ CeQueueDto u8 = insertInProgress("u8", "worker3");
underTest.resetTasksWithUnknownWorkerUUIDs(ImmutableSet.of("worker2", "worker3"));
// Pending tasks must not be modified even if a workerUUID is not present
verifyUnmodified(u1);
verifyUnmodified(u2);
- verifyUnmodified(u3);
- verifyUnmodified(u4);
// Unknown worker : null, "worker1"
- verifyReset(u5);
verifyReset(u6);
// Known workers : "worker2", "worker3"
@@ -501,25 +477,19 @@ public class InternalCeQueueImplTest {
@Test
public void resetTasksWithUnknownWorkerUUIDs_with_empty_set_will_reset_all_in_progress_tasks() {
- CeQueueDto u1 = insertCeQueueDto("u1", CeQueueDto.Status.PENDING, null);
- CeQueueDto u2 = insertCeQueueDto("u2", CeQueueDto.Status.PENDING, "worker1");
- CeQueueDto u3 = insertCeQueueDto("u3", CeQueueDto.Status.PENDING, null);
- CeQueueDto u4 = insertCeQueueDto("u4", CeQueueDto.Status.PENDING, "worker2");
- CeQueueDto u5 = insertCeQueueDto("u5", CeQueueDto.Status.IN_PROGRESS, null);
- CeQueueDto u6 = insertCeQueueDto("u6", CeQueueDto.Status.IN_PROGRESS, "worker1");
- CeQueueDto u7 = insertCeQueueDto("u7", CeQueueDto.Status.IN_PROGRESS, "worker2");
- CeQueueDto u8 = insertCeQueueDto("u8", CeQueueDto.Status.IN_PROGRESS, "worker3");
+ CeQueueDto u1 = insertCeQueueDto("u1");
+ CeQueueDto u2 = insertCeQueueDto("u2");
+ CeQueueDto u6 = insertInProgress("u6", "worker1");
+ CeQueueDto u7 = insertInProgress("u7", "worker2");
+ CeQueueDto u8 = insertInProgress("u8", "worker3");
underTest.resetTasksWithUnknownWorkerUUIDs(ImmutableSet.of());
// Pending tasks must not be modified even if a workerUUID is not present
verifyUnmodified(u1);
verifyUnmodified(u2);
- verifyUnmodified(u3);
- verifyUnmodified(u4);
// Unknown worker : null, "worker1"
- verifyReset(u5);
verifyReset(u6);
verifyReset(u7);
verifyReset(u8);
@@ -527,25 +497,19 @@ public class InternalCeQueueImplTest {
@Test
public void resetTasksWithUnknownWorkerUUIDs_with_worker_without_tasks_will_reset_all_in_progress_tasks() {
- CeQueueDto u1 = insertCeQueueDto("u1", CeQueueDto.Status.PENDING, null);
- CeQueueDto u2 = insertCeQueueDto("u2", CeQueueDto.Status.PENDING, "worker1");
- CeQueueDto u3 = insertCeQueueDto("u3", CeQueueDto.Status.PENDING, null);
- CeQueueDto u4 = insertCeQueueDto("u4", CeQueueDto.Status.PENDING, "worker2");
- CeQueueDto u5 = insertCeQueueDto("u5", CeQueueDto.Status.IN_PROGRESS, null);
- CeQueueDto u6 = insertCeQueueDto("u6", CeQueueDto.Status.IN_PROGRESS, "worker1");
- CeQueueDto u7 = insertCeQueueDto("u7", CeQueueDto.Status.IN_PROGRESS, "worker2");
- CeQueueDto u8 = insertCeQueueDto("u8", CeQueueDto.Status.IN_PROGRESS, "worker3");
+ CeQueueDto u1 = insertCeQueueDto("u1");
+ CeQueueDto u2 = insertCeQueueDto("u2");
+ CeQueueDto u6 = insertInProgress("u6", "worker1");
+ CeQueueDto u7 = insertInProgress("u7", "worker2");
+ CeQueueDto u8 = insertInProgress("u8", "worker3");
underTest.resetTasksWithUnknownWorkerUUIDs(ImmutableSet.of("worker1000", "worker1001"));
// Pending tasks must not be modified even if a workerUUID is not present
verifyUnmodified(u1);
verifyUnmodified(u2);
- verifyUnmodified(u3);
- verifyUnmodified(u4);
// Unknown worker : null, "worker1"
- verifyReset(u5);
verifyReset(u6);
verifyReset(u7);
verifyReset(u8);
@@ -560,8 +524,7 @@ public class InternalCeQueueImplTest {
assertThat(dto.getStatus()).isEqualTo(CeQueueDto.Status.PENDING).isNotEqualTo(original.getStatus());
// UpdatedAt must have been updated
assertThat(dto.getUpdatedAt()).isNotEqualTo(original.getUpdatedAt());
- // StartedAt must be null
- assertThat(dto.getStartedAt()).isNull();
+ assertThat(dto.getStartedAt()).isEqualTo(original.getStartedAt());
// WorkerUuid must be null
assertThat(dto.getWorkerUuid()).isNull();
}
@@ -573,19 +536,11 @@ public class InternalCeQueueImplTest {
assertThat(dto.getUpdatedAt()).isEqualTo(original.getUpdatedAt());
}
- private void verifyCanceled(CeQueueDto original) {
- assertThat(db.getDbClient().ceQueueDao().selectByUuid(db.getSession(), original.getUuid())).isEmpty();
- CeActivityDto dto = db.getDbClient().ceActivityDao().selectByUuid(db.getSession(), original.getUuid()).get();
- assertThat(dto.getStatus()).isEqualTo(CeActivityDto.Status.CANCELED);
- assertThat(dto.getWorkerUuid()).isEqualTo(original.getWorkerUuid());
- }
-
- private CeQueueDto insertCeQueueDto(String uuid, CeQueueDto.Status status, String workerUuid) {
+ private CeQueueDto insertCeQueueDto(String uuid) {
CeQueueDto dto = new CeQueueDto()
.setUuid(uuid)
.setTaskType("foo")
- .setStatus(status)
- .setWorkerUuid(workerUuid);
+ .setStatus(CeQueueDto.Status.PENDING);
db.getDbClient().ceQueueDao().insert(db.getSession(), dto);
db.commit();
return dto;
@@ -652,6 +607,11 @@ public class InternalCeQueueImplTest {
return componentDto;
}
+ private CeQueueDto makeInProgress(CeQueueDto ceQueueDto, String workerUuid) {
+ CeQueueTesting.makeInProgress(session, workerUuid, system2.now(), ceQueueDto);
+ return db.getDbClient().ceQueueDao().selectByUuid(session, ceQueueDto.getUuid()).get();
+ }
+
private static String stacktraceToString(Throwable error) {
ByteArrayOutputStream out = new ByteArrayOutputStream();
error.printStackTrace(new PrintStream(out));
diff --git a/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueDao.java b/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueDao.java
index c5e47fde5f5..308a90a5cfd 100644
--- a/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueDao.java
+++ b/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueDao.java
@@ -89,6 +89,10 @@ public class CeQueueDao implements Dao {
return mapper(dbSession).selectPending();
}
+ public List<CeQueueDto> selectWornout(DbSession dbSession) {
+ return mapper(dbSession).selectWornout();
+ }
+
public void resetTasksWithUnknownWorkerUUIDs(DbSession dbSession, Set<String> knownWorkerUUIDs) {
if (knownWorkerUUIDs.isEmpty()) {
mapper(dbSession).resetAllInProgressTasks(system2.now());
diff --git a/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueDto.java b/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueDto.java
index 640b9c217e4..b31c0bece83 100644
--- a/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueDto.java
+++ b/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueDto.java
@@ -98,10 +98,11 @@ public class CeQueueDto {
return workerUuid;
}
- public CeQueueDto setWorkerUuid(@Nullable String workerUuid) {
- checkArgument(workerUuid == null || workerUuid.length() <= 40, "worker uuid is too long: %s", workerUuid);
+ /**
+ * Accessed by MyBatis through reflexion. Field is otherwise read-only.
+ */
+ private void setWorkerUuid(@Nullable String workerUuid) {
this.workerUuid = workerUuid;
- return this;
}
@CheckForNull
@@ -109,9 +110,11 @@ public class CeQueueDto {
return startedAt;
}
- public CeQueueDto setStartedAt(@Nullable Long l) {
+ /**
+ * Accessed by MyBatis through reflexion. Field is otherwise read-only.
+ */
+ private void setStartedAt(@Nullable Long l) {
this.startedAt = l;
- return this;
}
public long getCreatedAt() {
diff --git a/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueMapper.java b/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueMapper.java
index 711e3d0bf84..9cccbb2466c 100644
--- a/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueMapper.java
+++ b/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueMapper.java
@@ -48,6 +48,11 @@ public interface CeQueueMapper {
List<CeQueueDto> selectPending();
/**
+ * Select all pending tasks which have already been started.
+ */
+ List<CeQueueDto> selectWornout();
+
+ /**
* Select all tasks whose worker UUID is not present in {@code knownWorkerUUIDs}
*/
void resetTasksWithUnknownWorkerUUIDs(@Param("knownWorkerUUIDs") List<String> knownWorkerUUIDs, @Param("updatedAt") long updatedAt);
diff --git a/server/sonar-db-dao/src/main/java/org/sonar/db/ce/UpdateIf.java b/server/sonar-db-dao/src/main/java/org/sonar/db/ce/UpdateIf.java
index 9b6e11f1704..1c1df6ec8a8 100644
--- a/server/sonar-db-dao/src/main/java/org/sonar/db/ce/UpdateIf.java
+++ b/server/sonar-db-dao/src/main/java/org/sonar/db/ce/UpdateIf.java
@@ -23,6 +23,7 @@ import javax.annotation.CheckForNull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;
+import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
final class UpdateIf {
@@ -38,7 +39,8 @@ final class UpdateIf {
private final long updatedAt;
NewProperties(CeQueueDto.Status status, @Nullable String workerUuid,
- Long startedAt, long updatedAt) {
+ long startedAt, long updatedAt) {
+ checkArgument(workerUuid == null || workerUuid.length() <= 40, "worker uuid is too long: %s", workerUuid);
this.status = requireNonNull(status, "status can't be null");
this.workerUuid = workerUuid;
this.startedAt = startedAt;
@@ -54,7 +56,6 @@ final class UpdateIf {
return workerUuid;
}
- @CheckForNull
public Long getStartedAt() {
return startedAt;
}
diff --git a/server/sonar-db-dao/src/main/resources/org/sonar/db/ce/CeQueueMapper.xml b/server/sonar-db-dao/src/main/resources/org/sonar/db/ce/CeQueueMapper.xml
index b2b8fb3e403..602e386ead5 100644
--- a/server/sonar-db-dao/src/main/resources/org/sonar/db/ce/CeQueueMapper.xml
+++ b/server/sonar-db-dao/src/main/resources/org/sonar/db/ce/CeQueueMapper.xml
@@ -169,6 +169,7 @@
ce_queue cq
where
cq.status='PENDING'
+ and cq.started_at is null
and not exists (
select
1
@@ -195,6 +196,16 @@
cq.status = 'PENDING'
</select>
+ <select id="selectWornout" resultType="org.sonar.db.ce.CeQueueDto">
+ select
+ <include refid="columns"/>
+ from
+ ce_queue cq
+ where
+ cq.status = 'PENDING'
+ and cq.started_at is not null
+ </select>
+
<insert id="insert" parameterType="org.sonar.db.ce.CeQueueDto" useGeneratedKeys="false">
insert into ce_queue
(
@@ -203,9 +214,7 @@
component_uuid,
status,
submitter_uuid,
- worker_uuid,
execution_count,
- started_at,
created_at,
updated_at
)
@@ -215,9 +224,7 @@
#{componentUuid,jdbcType=VARCHAR},
#{status,jdbcType=VARCHAR},
#{submitterUuid,jdbcType=VARCHAR},
- #{workerUuid,jdbcType=VARCHAR},
0,
- #{startedAt,jdbcType=BIGINT},
#{createdAt,jdbcType=BIGINT},
#{updatedAt,jdbcType=BIGINT}
)
@@ -226,7 +233,6 @@
<update id="resetToPendingForWorker">
update ce_queue set
status='PENDING',
- started_at=NULL,
updated_at=#{updatedAt,jdbcType=BIGINT}
where
status &lt;&gt; 'PENDING'
@@ -255,7 +261,6 @@
update ce_queue set
status='PENDING',
worker_uuid=NULL,
- started_at=NULL,
updated_at=#{updatedAt,jdbcType=BIGINT}
where
status = 'IN_PROGRESS'
@@ -272,7 +277,6 @@
update ce_queue set
status='PENDING',
worker_uuid=NULL,
- started_at=NULL,
updated_at=#{updatedAt,jdbcType=BIGINT}
where
status = 'IN_PROGRESS'
diff --git a/server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeActivityDaoTest.java b/server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeActivityDaoTest.java
index f03acf46547..f24e8b5a14a 100644
--- a/server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeActivityDaoTest.java
+++ b/server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeActivityDaoTest.java
@@ -41,6 +41,8 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.sonar.db.Pagination.forPage;
import static org.sonar.db.ce.CeActivityDto.Status.FAILED;
import static org.sonar.db.ce.CeActivityDto.Status.SUCCESS;
+import static org.sonar.db.ce.CeQueueDto.Status.PENDING;
+import static org.sonar.db.ce.CeQueueTesting.makeInProgress;
import static org.sonar.db.ce.CeTaskTypes.REPORT;
public class CeActivityDaoTest {
@@ -64,7 +66,7 @@ public class CeActivityDaoTest {
assertThat(dto.getComponentUuid()).isEqualTo("PROJECT_1");
assertThat(dto.getStatus()).isEqualTo(CeActivityDto.Status.SUCCESS);
assertThat(dto.getSubmitterUuid()).isEqualTo("submitter uuid");
- assertThat(dto.getSubmittedAt()).isEqualTo(1_300_000_000_000L);
+ assertThat(dto.getSubmittedAt()).isEqualTo(1_450_000_000_000L);
assertThat(dto.getWorkerUuid()).isEqualTo("worker uuid");
assertThat(dto.getIsLast()).isTrue();
assertThat(dto.getIsLastKey()).isEqualTo("REPORTPROJECT_1");
@@ -344,15 +346,20 @@ public class CeActivityDaoTest {
}
private CeActivityDto createActivityDto(String uuid, String type, String componentUuid, CeActivityDto.Status status) {
- CeQueueDto queueDto = new CeQueueDto();
- queueDto.setUuid(uuid);
- queueDto.setTaskType(type);
- queueDto.setComponentUuid(componentUuid);
- queueDto.setSubmitterUuid("submitter uuid");
- queueDto.setWorkerUuid("worker uuid");
- queueDto.setCreatedAt(1_300_000_000_000L);
+ CeQueueDto creating = new CeQueueDto();
+ creating.setUuid(uuid);
+ creating.setStatus(PENDING);
+ creating.setTaskType(type);
+ creating.setComponentUuid(componentUuid);
+ creating.setSubmitterUuid("submitter uuid");
+ creating.setCreatedAt(1_300_000_000_000L);
- CeActivityDto dto = new CeActivityDto(queueDto);
+ db.getDbClient().ceQueueDao().insert(dbSession, creating);
+ makeInProgress(dbSession, "worker uuid", 1_400_000_000_000L, creating);
+
+ CeQueueDto ceQueueDto = db.getDbClient().ceQueueDao().selectByUuid(dbSession, uuid).get();
+
+ CeActivityDto dto = new CeActivityDto(ceQueueDto);
dto.setStatus(status);
dto.setStartedAt(1_500_000_000_000L);
dto.setExecutedAt(1_500_000_000_500L);
@@ -385,8 +392,8 @@ public class CeActivityDaoTest {
private List<String> selectPageOfUuids(Pagination pagination) {
return underTest.selectByQuery(db.getSession(), new CeTaskQuery(), pagination).stream()
- .map(CeActivityToUuid.INSTANCE::apply)
- .collect(MoreCollectors.toList());
+ .map(CeActivityToUuid.INSTANCE::apply)
+ .collect(MoreCollectors.toList());
}
private enum CeActivityToUuid implements Function<CeActivityDto, String> {
diff --git a/server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeQueueDaoTest.java b/server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeQueueDaoTest.java
index a065ed4855b..6ae433012a7 100644
--- a/server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeQueueDaoTest.java
+++ b/server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeQueueDaoTest.java
@@ -42,11 +42,13 @@ import static com.google.common.collect.Lists.newArrayList;
import static java.util.Collections.singletonList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.entry;
+import static org.assertj.core.groups.Tuple.tuple;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.sonar.db.ce.CeQueueDto.Status.IN_PROGRESS;
import static org.sonar.db.ce.CeQueueDto.Status.PENDING;
import static org.sonar.db.ce.CeQueueTesting.newCeQueueDto;
+import static org.sonar.db.ce.CeQueueTesting.reset;
public class CeQueueDaoTest {
private static final long INIT_TIME = 1_450_000_000_000L;
@@ -66,10 +68,11 @@ public class CeQueueDaoTest {
public DbTester db = DbTester.create(system2);
private System2 mockedSystem2 = mock(System2.class);
+ private System2 alwaysIncreasingSystem2 = new AlwaysIncreasingSystem2();
private CeQueueDao underTest = new CeQueueDao(system2);
private CeQueueDao underTestWithSystem2Mock = new CeQueueDao(mockedSystem2);
- private CeQueueDao underTestAlwaysIncreasingSystem2 = new CeQueueDao(new AlwaysIncreasingSystem2());
+ private CeQueueDao underTestAlwaysIncreasingSystem2 = new CeQueueDao(alwaysIncreasingSystem2);
@Test
public void insert_populates_createdAt_and_updateAt_from_System2_with_same_value_if_any_is_not_set() {
@@ -78,8 +81,7 @@ public class CeQueueDaoTest {
.setTaskType(CeTaskTypes.REPORT)
.setComponentUuid(COMPONENT_UUID_1)
.setStatus(PENDING)
- .setSubmitterUuid(SUBMITTER_LOGIN)
- .setWorkerUuid(WORKER_UUID_1);
+ .setSubmitterUuid(SUBMITTER_LOGIN);
mockSystem2ForSingleCall(now);
underTestWithSystem2Mock.insert(db.getSession(), dto.setUuid(TASK_UUID_1));
@@ -100,7 +102,7 @@ public class CeQueueDaoTest {
assertThat(saved.getComponentUuid()).isEqualTo(COMPONENT_UUID_1);
assertThat(saved.getStatus()).isEqualTo(PENDING);
assertThat(saved.getSubmitterUuid()).isEqualTo(SUBMITTER_LOGIN);
- assertThat(saved.getWorkerUuid()).isEqualTo(WORKER_UUID_1);
+ assertThat(saved.getWorkerUuid()).isNull();
assertThat(saved.getCreatedAt()).isEqualTo(now);
assertThat(saved.getUpdatedAt()).isEqualTo(now);
assertThat(saved.getStartedAt()).isNull();
@@ -111,7 +113,7 @@ public class CeQueueDaoTest {
assertThat(saved.getComponentUuid()).isEqualTo(COMPONENT_UUID_1);
assertThat(saved.getStatus()).isEqualTo(PENDING);
assertThat(saved.getSubmitterUuid()).isEqualTo(SUBMITTER_LOGIN);
- assertThat(saved.getWorkerUuid()).isEqualTo(WORKER_UUID_1);
+ assertThat(saved.getWorkerUuid()).isNull();
assertThat(saved.getCreatedAt()).isEqualTo(6_888_777L);
assertThat(saved.getUpdatedAt()).isEqualTo(8_000_999L);
assertThat(saved.getStartedAt()).isNull();
@@ -119,7 +121,7 @@ public class CeQueueDaoTest {
@Test
public void test_selectByUuid() {
- insert(TASK_UUID_1, COMPONENT_UUID_1, PENDING);
+ insertPending(TASK_UUID_1, COMPONENT_UUID_1);
assertThat(underTest.selectByUuid(db.getSession(), "TASK_UNKNOWN").isPresent()).isFalse();
CeQueueDto saved = underTest.selectByUuid(db.getSession(), TASK_UUID_1).get();
@@ -136,9 +138,9 @@ public class CeQueueDaoTest {
@Test
public void test_selectByComponentUuid() {
- insert(TASK_UUID_1, COMPONENT_UUID_1, PENDING);
- insert(TASK_UUID_2, COMPONENT_UUID_1, PENDING);
- insert(TASK_UUID_3, "PROJECT_2", PENDING);
+ insertPending(TASK_UUID_1, COMPONENT_UUID_1);
+ insertPending(TASK_UUID_2, COMPONENT_UUID_1);
+ insertPending(TASK_UUID_3, "PROJECT_2");
assertThat(underTest.selectByComponentUuid(db.getSession(), "UNKNOWN")).isEmpty();
assertThat(underTest.selectByComponentUuid(db.getSession(), COMPONENT_UUID_1)).extracting("uuid").containsOnly(TASK_UUID_1, TASK_UUID_2);
@@ -147,21 +149,21 @@ public class CeQueueDaoTest {
@Test
public void test_selectAllInAscOrder() {
- insert(TASK_UUID_1, COMPONENT_UUID_1, PENDING);
- insert(TASK_UUID_2, COMPONENT_UUID_1, PENDING);
- insert(TASK_UUID_3, "PROJECT_2", PENDING);
+ insertPending(TASK_UUID_1, COMPONENT_UUID_1);
+ insertPending(TASK_UUID_2, COMPONENT_UUID_1);
+ insertPending(TASK_UUID_3, "PROJECT_2");
assertThat(underTest.selectAllInAscOrder(db.getSession())).extracting("uuid").containsOnly(TASK_UUID_1, TASK_UUID_2, TASK_UUID_3);
}
@Test
public void selectPending_returns_pending_tasks() {
- insert("p1", CeQueueDto.Status.PENDING);
- insert("p2", CeQueueDto.Status.PENDING);
- insert("p3", CeQueueDto.Status.PENDING);
- insert("i1", CeQueueDto.Status.IN_PROGRESS);
- insert("i2", CeQueueDto.Status.IN_PROGRESS);
- insert("i3", CeQueueDto.Status.IN_PROGRESS);
+ insertPending("p1");
+ insertPending("p2");
+ insertPending("p3");
+ makeInProgress("w1", alwaysIncreasingSystem2.now(), insertPending("i1"));
+ makeInProgress("w1", alwaysIncreasingSystem2.now(), insertPending("i2"));
+ makeInProgress("w1", alwaysIncreasingSystem2.now(), insertPending("i3"));
assertThat(underTest.selectPending(db.getSession()))
.extracting(CeQueueDto::getUuid)
@@ -169,8 +171,22 @@ public class CeQueueDaoTest {
}
@Test
+ public void selectWornout_returns_task_pending_with_a_non_null_startedAt() {
+ insertPending("p1");
+ makeInProgress("w1", alwaysIncreasingSystem2.now(), insertPending("i1"));
+ CeQueueDto resetDto = makeInProgress("w1", alwaysIncreasingSystem2.now(), insertPending("i2"));
+ makeInProgress("w1", alwaysIncreasingSystem2.now(), insertPending("i3"));
+ reset(db.getSession(), alwaysIncreasingSystem2.now(), resetDto);
+
+ List<CeQueueDto> ceQueueDtos = underTest.selectWornout(db.getSession());
+ assertThat(ceQueueDtos)
+ .extracting(CeQueueDto::getStatus, CeQueueDto::getUuid)
+ .containsOnly(tuple(PENDING, resetDto.getUuid()));
+ }
+
+ @Test
public void test_delete() {
- insert(TASK_UUID_1, COMPONENT_UUID_1, PENDING);
+ insertPending(TASK_UUID_1, COMPONENT_UUID_1);
underTest.deleteByUuid(db.getSession(), "UNKNOWN");
assertThat(underTest.selectByUuid(db.getSession(), TASK_UUID_1)).isPresent();
@@ -181,92 +197,91 @@ public class CeQueueDaoTest {
@Test
public void resetToPendingForWorker_resets_status_of_non_pending_tasks_only_for_specified_workerUuid() {
- long startedAt = 2_099_888L;
- CeQueueDto u1 = insert("u1", CeQueueDto.Status.IN_PROGRESS, WORKER_UUID_1, startedAt);
- CeQueueDto u2 = insert("u2", CeQueueDto.Status.PENDING, WORKER_UUID_1, startedAt);
- CeQueueDto u3 = insert("u3", CeQueueDto.Status.PENDING, WORKER_UUID_1, startedAt);
- CeQueueDto u4 = insert("u4", CeQueueDto.Status.IN_PROGRESS, WORKER_UUID_1, startedAt);
- CeQueueDto o1 = insert("o1", CeQueueDto.Status.IN_PROGRESS, WORKER_UUID_2, startedAt);
- CeQueueDto o2 = insert("o2", CeQueueDto.Status.PENDING, WORKER_UUID_2, startedAt);
- CeQueueDto o3 = insert("o3", CeQueueDto.Status.PENDING, WORKER_UUID_2, startedAt);
- CeQueueDto o4 = insert("o4", CeQueueDto.Status.IN_PROGRESS, WORKER_UUID_2, startedAt);
+ CeQueueDto[] worker1 = {insertPending("u1"), insertPending("u2"), insertPending("u3"), insertPending("u4")};
+ CeQueueDto[] worker2 = {insertPending("o1"), insertPending("o2"), insertPending("o3"), insertPending("o4")};
+ long startedAt = alwaysIncreasingSystem2.now();
+ makeInProgress(WORKER_UUID_1, startedAt, worker1[0]);
+ makeInProgress(WORKER_UUID_1, startedAt, worker1[3]);
+ makeInProgress(WORKER_UUID_2, startedAt, worker2[0]);
+ makeInProgress(WORKER_UUID_2, startedAt, worker2[3]);
underTestAlwaysIncreasingSystem2.resetToPendingForWorker(db.getSession(), WORKER_UUID_1);
- verifyResetToPendingForWorker(u1);
- verifyUnchangedByResetToPendingForWorker(u2);
- verifyUnchangedByResetToPendingForWorker(u3);
- verifyResetToPendingForWorker(u4);
- verifyUnchangedByResetToPendingForWorker(o1);
- verifyUnchangedByResetToPendingForWorker(o2);
- verifyUnchangedByResetToPendingForWorker(o3);
- verifyUnchangedByResetToPendingForWorker(o4);
+ verifyResetToPendingForWorker(worker1[0], WORKER_UUID_1, startedAt);
+ verifyUnchangedByResetToPendingForWorker(worker1[1]);
+ verifyUnchangedByResetToPendingForWorker(worker1[2]);
+ verifyResetToPendingForWorker(worker1[3], WORKER_UUID_1, startedAt);
+ verifyInProgressUnchangedByResetToPendingForWorker(worker2[0], WORKER_UUID_2, startedAt);
+ verifyUnchangedByResetToPendingForWorker(worker2[1]);
+ verifyUnchangedByResetToPendingForWorker(worker2[2]);
+ verifyInProgressUnchangedByResetToPendingForWorker(worker2[3], WORKER_UUID_2, startedAt);
}
@Test
public void resetTasksWithUnknownWorkerUUIDs_with_empty_set_resets_status_of_all_pending_tasks() {
- long startedAt = 2_099_888L;
- CeQueueDto u1 = insert("u1", CeQueueDto.Status.IN_PROGRESS, WORKER_UUID_1, startedAt);
- CeQueueDto u2 = insert("u2", CeQueueDto.Status.PENDING, WORKER_UUID_1, startedAt);
- CeQueueDto u3 = insert("u3", CeQueueDto.Status.PENDING, WORKER_UUID_1, startedAt);
- CeQueueDto u4 = insert("u4", CeQueueDto.Status.IN_PROGRESS, WORKER_UUID_1, startedAt);
- CeQueueDto o1 = insert("o1", CeQueueDto.Status.IN_PROGRESS, WORKER_UUID_2, startedAt);
- CeQueueDto o2 = insert("o2", CeQueueDto.Status.PENDING, WORKER_UUID_2, startedAt);
- CeQueueDto o3 = insert("o3", CeQueueDto.Status.PENDING, WORKER_UUID_2, startedAt);
- CeQueueDto o4 = insert("o4", CeQueueDto.Status.IN_PROGRESS, WORKER_UUID_2, startedAt);
+ CeQueueDto[] worker1 = {insertPending("u1"), insertPending("u2"), insertPending("u3"), insertPending("u4")};
+ CeQueueDto[] worker2 = {insertPending("o1"), insertPending("o2"), insertPending("o3"), insertPending("o4")};
+ long startedAt = alwaysIncreasingSystem2.now();
+ makeInProgress(WORKER_UUID_1, startedAt, worker1[0]);
+ makeInProgress(WORKER_UUID_1, startedAt, worker1[3]);
+ makeInProgress(WORKER_UUID_2, startedAt, worker2[0]);
+ makeInProgress(WORKER_UUID_2, startedAt, worker2[3]);
underTestAlwaysIncreasingSystem2.resetTasksWithUnknownWorkerUUIDs(db.getSession(), ImmutableSet.of());
- verifyResetByResetTasks(u1);
- verifyUnchangedByResetToPendingForWorker(u2);
- verifyUnchangedByResetToPendingForWorker(u3);
- verifyResetByResetTasks(u4);
- verifyResetByResetTasks(o1);
- verifyUnchangedByResetToPendingForWorker(o2);
- verifyUnchangedByResetToPendingForWorker(o3);
- verifyResetByResetTasks(o4);
+ verifyResetByResetTasks(worker1[0], startedAt);
+ verifyUnchangedByResetToPendingForWorker(worker1[1]);
+ verifyUnchangedByResetToPendingForWorker(worker1[2]);
+ verifyResetByResetTasks(worker1[3], startedAt);
+ verifyResetByResetTasks(worker2[0], startedAt);
+ verifyUnchangedByResetToPendingForWorker(worker2[1]);
+ verifyUnchangedByResetToPendingForWorker(worker2[2]);
+ verifyResetByResetTasks(worker2[3], startedAt);
}
@Test
public void resetTasksWithUnknownWorkerUUIDs_set_resets_status_of_all_pending_tasks_with_unknown_workers() {
- long startedAt = 2_099_888L;
- CeQueueDto u1 = insert("u1", CeQueueDto.Status.IN_PROGRESS, WORKER_UUID_1, startedAt);
- CeQueueDto u2 = insert("u2", CeQueueDto.Status.PENDING, WORKER_UUID_1, startedAt);
- CeQueueDto u3 = insert("u3", CeQueueDto.Status.PENDING, WORKER_UUID_1, startedAt);
- CeQueueDto u4 = insert("u4", CeQueueDto.Status.IN_PROGRESS, WORKER_UUID_1, startedAt);
- CeQueueDto o1 = insert("o1", CeQueueDto.Status.IN_PROGRESS, WORKER_UUID_2, startedAt);
- CeQueueDto o2 = insert("o2", CeQueueDto.Status.PENDING, WORKER_UUID_2, startedAt);
- CeQueueDto o3 = insert("o3", CeQueueDto.Status.PENDING, WORKER_UUID_2, startedAt);
- CeQueueDto o4 = insert("o4", CeQueueDto.Status.IN_PROGRESS, WORKER_UUID_2, startedAt);
+ CeQueueDto[] worker1 = {insertPending("u1"), insertPending("u2"), insertPending("u3"), insertPending("u4")};
+ CeQueueDto[] worker2 = {insertPending("o1"), insertPending("o2"), insertPending("o3"), insertPending("o4")};
+ long startedAt = alwaysIncreasingSystem2.now();
+ makeInProgress(WORKER_UUID_1, startedAt, worker1[0]);
+ makeInProgress(WORKER_UUID_1, startedAt, worker1[3]);
+ makeInProgress(WORKER_UUID_2, startedAt, worker2[0]);
+ makeInProgress(WORKER_UUID_2, startedAt, worker2[3]);
underTestAlwaysIncreasingSystem2.resetTasksWithUnknownWorkerUUIDs(db.getSession(), ImmutableSet.of(WORKER_UUID_1, "unknown"));
- verifyUnchangedByResetToPendingForWorker(u1);
- verifyUnchangedByResetToPendingForWorker(u2);
- verifyUnchangedByResetToPendingForWorker(u3);
- verifyUnchangedByResetToPendingForWorker(u4);
- verifyResetByResetTasks(o1);
- verifyUnchangedByResetToPendingForWorker(o2);
- verifyUnchangedByResetToPendingForWorker(o3);
- verifyResetByResetTasks(o4);
+ verifyInProgressUnchangedByResetToPendingForWorker(worker1[0], WORKER_UUID_1, startedAt);
+ verifyUnchangedByResetToPendingForWorker(worker1[1]);
+ verifyUnchangedByResetToPendingForWorker(worker1[2]);
+ verifyInProgressUnchangedByResetToPendingForWorker(worker1[3], WORKER_UUID_1, startedAt);
+ verifyResetByResetTasks(worker2[0], startedAt);
+ verifyUnchangedByResetToPendingForWorker(worker2[1]);
+ verifyUnchangedByResetToPendingForWorker(worker2[2]);
+ verifyResetByResetTasks(worker2[3], startedAt);
+ }
+
+ private CeQueueDto makeInProgress(String workerUuid, long startedAt, CeQueueDto ceQueueDto) {
+ CeQueueTesting.makeInProgress(db.getSession(), workerUuid, startedAt, ceQueueDto);
+ return underTestAlwaysIncreasingSystem2.selectByUuid(db.getSession(), ceQueueDto.getUuid()).get();
}
- private void verifyResetByResetTasks(CeQueueDto original) {
+ private void verifyResetByResetTasks(CeQueueDto original, long startedAt) {
CeQueueDto dto = db.getDbClient().ceQueueDao().selectByUuid(db.getSession(), original.getUuid()).get();
- assertThat(dto.getStatus()).isEqualTo(CeQueueDto.Status.PENDING).isNotEqualTo(original.getStatus());
- assertThat(dto.getStartedAt()).isNull();
+ assertThat(dto.getStatus()).isEqualTo(PENDING);
+ assertThat(dto.getStartedAt()).isEqualTo(startedAt);
assertThat(dto.getCreatedAt()).isEqualTo(original.getCreatedAt());
- assertThat(dto.getUpdatedAt()).isGreaterThan(original.getUpdatedAt());
+ assertThat(dto.getUpdatedAt()).isGreaterThan(startedAt);
assertThat(dto.getWorkerUuid()).isNull();
}
- private void verifyResetToPendingForWorker(CeQueueDto original) {
+ private void verifyResetToPendingForWorker(CeQueueDto original, String workerUuid, long startedAt) {
CeQueueDto dto = db.getDbClient().ceQueueDao().selectByUuid(db.getSession(), original.getUuid()).get();
- assertThat(dto.getStatus()).isEqualTo(CeQueueDto.Status.PENDING);
- assertThat(dto.getStartedAt()).isNull();
+ assertThat(dto.getStatus()).isEqualTo(PENDING);
+ assertThat(dto.getStartedAt()).isEqualTo(startedAt);
assertThat(dto.getCreatedAt()).isEqualTo(original.getCreatedAt());
- assertThat(dto.getUpdatedAt()).isGreaterThan(original.getUpdatedAt());
- assertThat(dto.getWorkerUuid()).isEqualTo(original.getWorkerUuid());
+ assertThat(dto.getUpdatedAt()).isGreaterThan(startedAt);
+ assertThat(dto.getWorkerUuid()).isEqualTo(workerUuid);
}
private void verifyUnchangedByResetToPendingForWorker(CeQueueDto original) {
@@ -278,20 +293,29 @@ public class CeQueueDaoTest {
assertThat(dto.getWorkerUuid()).isEqualTo(original.getWorkerUuid());
}
+ private void verifyInProgressUnchangedByResetToPendingForWorker(CeQueueDto original, String workerUuid, long startedAt) {
+ CeQueueDto dto = db.getDbClient().ceQueueDao().selectByUuid(db.getSession(), original.getUuid()).get();
+ assertThat(dto.getStatus()).isEqualTo(IN_PROGRESS);
+ assertThat(dto.getStartedAt()).isEqualTo(startedAt);
+ assertThat(dto.getCreatedAt()).isEqualTo(original.getCreatedAt());
+ assertThat(dto.getUpdatedAt()).isEqualTo(startedAt);
+ assertThat(dto.getWorkerUuid()).isEqualTo(workerUuid);
+ }
+
@Test
public void peek_none_if_no_pendings() {
assertThat(underTest.peek(db.getSession(), WORKER_UUID_1).isPresent()).isFalse();
// not pending, but in progress
- insert(TASK_UUID_1, COMPONENT_UUID_1, IN_PROGRESS);
+ makeInProgress(WORKER_UUID_1, 2_232_222L, insertPending(TASK_UUID_1, COMPONENT_UUID_1));
assertThat(underTest.peek(db.getSession(), WORKER_UUID_1).isPresent()).isFalse();
}
@Test
public void peek_oldest_pending() {
- insert(TASK_UUID_1, COMPONENT_UUID_1, PENDING);
+ insertPending(TASK_UUID_1, COMPONENT_UUID_1);
system2.setNow(INIT_TIME + 3_000_000);
- insert(TASK_UUID_2, COMPONENT_UUID_2, PENDING);
+ insertPending(TASK_UUID_2, COMPONENT_UUID_2);
assertThat(db.countRowsOfTable("ce_queue")).isEqualTo(2);
verifyCeQueueStatuses(TASK_UUID_1, PENDING, TASK_UUID_2, PENDING);
@@ -319,9 +343,9 @@ public class CeQueueDaoTest {
@Test
public void do_not_peek_multiple_tasks_on_same_project_at_the_same_time() {
// two pending tasks on the same project
- insert(TASK_UUID_1, COMPONENT_UUID_1, PENDING);
+ insertPending(TASK_UUID_1, COMPONENT_UUID_1);
system2.setNow(INIT_TIME + 3_000_000);
- insert(TASK_UUID_2, COMPONENT_UUID_1, PENDING);
+ insertPending(TASK_UUID_2, COMPONENT_UUID_1);
Optional<CeQueueDto> peek = underTest.peek(db.getSession(), WORKER_UUID_1);
assertThat(peek).isPresent();
@@ -343,35 +367,35 @@ public class CeQueueDaoTest {
@Test
public void select_by_query() {
// task status not in query
- insert(newCeQueueDto(TASK_UUID_1)
+ insertPending(newCeQueueDto(TASK_UUID_1)
.setComponentUuid(COMPONENT_UUID_1)
.setStatus(IN_PROGRESS)
.setTaskType(CeTaskTypes.REPORT)
.setCreatedAt(100_000L));
// too early
- insert(newCeQueueDto(TASK_UUID_3)
+ insertPending(newCeQueueDto(TASK_UUID_3)
.setComponentUuid(COMPONENT_UUID_1)
.setStatus(PENDING)
.setTaskType(CeTaskTypes.REPORT)
.setCreatedAt(90_000L));
// task type not in query
- insert(newCeQueueDto("TASK_4")
+ insertPending(newCeQueueDto("TASK_4")
.setComponentUuid("PROJECT_2")
.setStatus(PENDING)
.setTaskType("ANOTHER_TYPE")
.setCreatedAt(100_000L));
// correct
- insert(newCeQueueDto(TASK_UUID_2)
+ insertPending(newCeQueueDto(TASK_UUID_2)
.setComponentUuid(COMPONENT_UUID_1)
.setStatus(PENDING)
.setTaskType(CeTaskTypes.REPORT)
.setCreatedAt(100_000L));
// correct submitted later
- insert(newCeQueueDto("TASK_5")
+ insertPending(newCeQueueDto("TASK_5")
.setComponentUuid(COMPONENT_UUID_1)
.setStatus(PENDING)
.setTaskType(CeTaskTypes.REPORT)
@@ -393,7 +417,7 @@ public class CeQueueDaoTest {
@Test
public void select_by_query_returns_empty_list_when_only_current() {
- insert(newCeQueueDto(TASK_UUID_1)
+ insertPending(newCeQueueDto(TASK_UUID_1)
.setComponentUuid(COMPONENT_UUID_1)
.setStatus(IN_PROGRESS)
.setTaskType(CeTaskTypes.REPORT)
@@ -410,7 +434,7 @@ public class CeQueueDaoTest {
@Test
public void select_by_query_returns_empty_list_when_max_submitted_at() {
- insert(newCeQueueDto(TASK_UUID_1)
+ insertPending(newCeQueueDto(TASK_UUID_1)
.setComponentUuid(COMPONENT_UUID_1)
.setStatus(IN_PROGRESS)
.setTaskType(CeTaskTypes.REPORT)
@@ -427,7 +451,7 @@ public class CeQueueDaoTest {
@Test
public void select_by_query_returns_empty_list_when_empty_list_of_component_uuid() {
- insert(newCeQueueDto(TASK_UUID_1)
+ insertPending(newCeQueueDto(TASK_UUID_1)
.setComponentUuid(COMPONENT_UUID_1)
.setStatus(IN_PROGRESS)
.setTaskType(CeTaskTypes.REPORT)
@@ -445,19 +469,19 @@ public class CeQueueDaoTest {
@Test
public void count_by_status_and_component_uuid() {
// task retrieved in the queue
- insert(newCeQueueDto(TASK_UUID_1)
+ insertPending(newCeQueueDto(TASK_UUID_1)
.setComponentUuid(COMPONENT_UUID_1)
.setStatus(IN_PROGRESS)
.setTaskType(CeTaskTypes.REPORT)
.setCreatedAt(100_000L));
// on component uuid 2, not returned
- insert(newCeQueueDto(TASK_UUID_2)
+ insertPending(newCeQueueDto(TASK_UUID_2)
.setComponentUuid(COMPONENT_UUID_2)
.setStatus(IN_PROGRESS)
.setTaskType(CeTaskTypes.REPORT)
.setCreatedAt(100_000L));
// pending status, not returned
- insert(newCeQueueDto(TASK_UUID_3)
+ insertPending(newCeQueueDto(TASK_UUID_3)
.setComponentUuid(COMPONENT_UUID_1)
.setStatus(PENDING)
.setTaskType(CeTaskTypes.REPORT)
@@ -470,19 +494,19 @@ public class CeQueueDaoTest {
@Test
public void count_by_status_and_component_uuids() {
// task retrieved in the queue
- insert(newCeQueueDto(TASK_UUID_1)
+ insertPending(newCeQueueDto(TASK_UUID_1)
.setComponentUuid(COMPONENT_UUID_1)
.setStatus(IN_PROGRESS)
.setTaskType(CeTaskTypes.REPORT)
.setCreatedAt(100_000L));
// on component uuid 2, not returned
- insert(newCeQueueDto(TASK_UUID_2)
+ insertPending(newCeQueueDto(TASK_UUID_2)
.setComponentUuid(COMPONENT_UUID_2)
.setStatus(IN_PROGRESS)
.setTaskType(CeTaskTypes.REPORT)
.setCreatedAt(100_000L));
// pending status, not returned
- insert(newCeQueueDto(TASK_UUID_3)
+ insertPending(newCeQueueDto(TASK_UUID_3)
.setComponentUuid(COMPONENT_UUID_1)
.setStatus(PENDING)
.setTaskType(CeTaskTypes.REPORT)
@@ -497,41 +521,28 @@ public class CeQueueDaoTest {
assertThat(underTest.countByStatus(db.getSession(), IN_PROGRESS)).isEqualTo(2);
}
- private void insert(CeQueueDto dto) {
+ private void insertPending(CeQueueDto dto) {
underTest.insert(db.getSession(), dto);
db.commit();
}
- private CeQueueDto insert(String uuid, CeQueueDto.Status status) {
- CeQueueDto dto = new CeQueueDto();
- dto.setUuid(uuid);
- dto.setTaskType(CeTaskTypes.REPORT);
- dto.setStatus(status);
- dto.setSubmitterUuid("henri");
- underTestAlwaysIncreasingSystem2.insert(db.getSession(), dto);
- db.getSession().commit();
- return dto;
- }
-
- private CeQueueDto insert(String uuid, CeQueueDto.Status status, String workerUuid, Long startedAt) {
+ private CeQueueDto insertPending(String uuid) {
CeQueueDto dto = new CeQueueDto();
dto.setUuid(uuid);
dto.setTaskType(CeTaskTypes.REPORT);
- dto.setStatus(status);
+ dto.setStatus(PENDING);
dto.setSubmitterUuid("henri");
- dto.setWorkerUuid(workerUuid);
- dto.setStartedAt(startedAt);
underTestAlwaysIncreasingSystem2.insert(db.getSession(), dto);
db.getSession().commit();
return dto;
}
- private CeQueueDto insert(String uuid, String componentUuid, CeQueueDto.Status status) {
+ private CeQueueDto insertPending(String uuid, String componentUuid) {
CeQueueDto dto = new CeQueueDto();
dto.setUuid(uuid);
dto.setTaskType(CeTaskTypes.REPORT);
dto.setComponentUuid(componentUuid);
- dto.setStatus(status);
+ dto.setStatus(PENDING);
dto.setSubmitterUuid("henri");
underTest.insert(db.getSession(), dto);
db.getSession().commit();
diff --git a/server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeQueueDtoTest.java b/server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeQueueDtoTest.java
index 85724236dac..b93c6c90fd7 100644
--- a/server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeQueueDtoTest.java
+++ b/server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeQueueDtoTest.java
@@ -93,22 +93,4 @@ public class CeQueueDtoTest {
underTest.setSubmitterUuid(str_256_chars);
}
-
- @Test
- public void setWorkerUuid_accepts_null_empty_and_string_40_chars_or_less() {
- underTest.setWorkerUuid(null);
- underTest.setWorkerUuid("");
- underTest.setWorkerUuid("bar");
- underTest.setWorkerUuid(STR_40_CHARS);
- }
-
- @Test
- public void setWorkerUuid_throws_IAE_if_value_is_41_chars() {
- String str_41_chars = STR_40_CHARS + "a";
-
- expectedException.expect(IllegalArgumentException.class);
- expectedException.expectMessage("worker uuid is too long: " + str_41_chars);
-
- underTest.setWorkerUuid(str_41_chars);
- }
}
diff --git a/server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeQueueTesting.java b/server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeQueueTesting.java
index 14e2ab30db0..16330122d55 100644
--- a/server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeQueueTesting.java
+++ b/server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeQueueTesting.java
@@ -19,8 +19,15 @@
*/
package org.sonar.db.ce;
+import java.util.stream.Stream;
+import org.sonar.db.DbSession;
+
+import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.commons.lang.RandomStringUtils.randomAlphanumeric;
import static org.apache.commons.lang.math.RandomUtils.nextLong;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.sonar.db.ce.CeQueueDto.Status.IN_PROGRESS;
+import static org.sonar.db.ce.CeQueueDto.Status.PENDING;
public class CeQueueTesting {
private CeQueueTesting() {
@@ -35,9 +42,29 @@ public class CeQueueTesting {
.setTaskType(CeTaskTypes.REPORT)
.setSubmitterUuid(randomAlphanumeric(255))
.setCreatedAt(nextLong())
- .setUpdatedAt(nextLong())
- .setStartedAt(nextLong());
+ .setUpdatedAt(nextLong());
+ }
+
+ public static void makeInProgress(DbSession dbSession, String workerUuid, long now, CeQueueDto... ceQueueDtos) {
+ Stream.of(ceQueueDtos).forEach(ceQueueDto -> {
+ CeQueueMapper mapper = dbSession.getMapper(CeQueueMapper.class);
+ int touchedRows = mapper.updateIf(ceQueueDto.getUuid(),
+ new UpdateIf.NewProperties(IN_PROGRESS, workerUuid, now, now),
+ new UpdateIf.OldProperties(PENDING));
+ assertThat(touchedRows).isEqualTo(1);
+ });
}
+ public static void reset(DbSession dbSession, long now, CeQueueDto... ceQueueDtos) {
+ Stream.of(ceQueueDtos).forEach(ceQueueDto -> {
+ checkArgument(ceQueueDto.getStatus() == IN_PROGRESS);
+ checkArgument(ceQueueDto.getWorkerUuid() != null);
+ CeQueueMapper mapper = dbSession.getMapper(CeQueueMapper.class);
+ int touchedRows = mapper.updateIf(ceQueueDto.getUuid(),
+ new UpdateIf.NewProperties(PENDING, ceQueueDto.getUuid(), now, now),
+ new UpdateIf.OldProperties(IN_PROGRESS));
+ assertThat(touchedRows).isEqualTo(1);
+ });
+ }
}
diff --git a/server/sonar-db-dao/src/test/java/org/sonar/db/ce/UpdateIfTest.java b/server/sonar-db-dao/src/test/java/org/sonar/db/ce/UpdateIfTest.java
new file mode 100644
index 00000000000..0b10e749863
--- /dev/null
+++ b/server/sonar-db-dao/src/test/java/org/sonar/db/ce/UpdateIfTest.java
@@ -0,0 +1,91 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2018 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.db.ce;
+
+import com.tngtech.java.junit.dataprovider.DataProvider;
+import com.tngtech.java.junit.dataprovider.DataProviderRunner;
+import com.tngtech.java.junit.dataprovider.UseDataProvider;
+import java.util.Random;
+import org.apache.commons.lang.RandomStringUtils;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+@RunWith(DataProviderRunner.class)
+public class UpdateIfTest {
+ private static final String STR_40_CHARS = "0123456789012345678901234567890123456789";
+
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
+ @Test
+ public void newProperties_constructor_accepts_null_workerUuid() {
+ UpdateIf.NewProperties newProperties = new UpdateIf.NewProperties(CeQueueDto.Status.PENDING, null, 123, 456);
+
+ assertThat(newProperties.getWorkerUuid()).isNull();
+ }
+
+ @Test
+ public void newProperties_constructor_fails_with_NPE_if_status_is_null() {
+ expectedException.expect(NullPointerException.class);
+ expectedException.expectMessage("status can't be null");
+
+ new UpdateIf.NewProperties(null, "foo", 123, 456);
+ }
+
+ @Test
+ public void newProperties_constructor_fails_with_IAE_if_workerUuid_is_41_or_more() {
+ String workerUuid = RandomStringUtils.randomAlphanumeric(41 + new Random().nextInt(5));
+
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage("worker uuid is too long: " + workerUuid);
+
+ new UpdateIf.NewProperties(CeQueueDto.Status.PENDING, workerUuid, 123, 456);
+ }
+
+ @Test
+ @UseDataProvider("workerUuidValidValues")
+ public void newProperties_constructor_accepts_null_empty_and_string_40_chars_or_less(String workerUuid) {
+ new UpdateIf.NewProperties(CeQueueDto.Status.PENDING, workerUuid, 123, 345);
+ }
+
+ @DataProvider
+ public static Object[][] workerUuidValidValues() {
+ return new Object[][] {
+ {null},
+ {""},
+ {"bar"},
+ {STR_40_CHARS}
+ };
+ }
+
+ @Test
+ public void newProperties_constructor_IAE_if_workerUuid_is_41_chars() {
+ String str_41_chars = STR_40_CHARS + "a";
+
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage("worker uuid is too long: " + str_41_chars);
+
+ new UpdateIf.NewProperties(CeQueueDto.Status.PENDING, str_41_chars, 123, 345);
+ }
+}
diff --git a/server/sonar-server/src/test/java/org/sonar/server/ce/ws/TaskFormatterTest.java b/server/sonar-server/src/test/java/org/sonar/server/ce/ws/TaskFormatterTest.java
index 42a81c4a6d5..bcd8b84e7f1 100644
--- a/server/sonar-server/src/test/java/org/sonar/server/ce/ws/TaskFormatterTest.java
+++ b/server/sonar-server/src/test/java/org/sonar/server/ce/ws/TaskFormatterTest.java
@@ -39,6 +39,7 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.tuple;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import static org.sonar.db.ce.CeQueueTesting.makeInProgress;
public class TaskFormatterTest {
@@ -85,13 +86,15 @@ public class TaskFormatterTest {
CeQueueDto dto = new CeQueueDto();
dto.setUuid("UUID");
dto.setTaskType("TYPE");
- dto.setStatus(CeQueueDto.Status.IN_PROGRESS);
+ dto.setStatus(CeQueueDto.Status.PENDING);
dto.setCreatedAt(1_450_000_000_000L);
- dto.setStartedAt(1_451_000_000_000L);
dto.setComponentUuid(uuid);
dto.setSubmitterUuid(user.getUuid());
+ db.getDbClient().ceQueueDao().insert(db.getSession(), dto);
+ makeInProgress(db.getSession(), "workerUuid", 1_958_000_000_000L, dto);
+ CeQueueDto inProgress = db.getDbClient().ceQueueDao().selectByUuid(db.getSession(), dto.getUuid()).get();
- Ce.Task wsTask = underTest.formatQueue(db.getSession(), dto);
+ Ce.Task wsTask = underTest.formatQueue(db.getSession(), inProgress);
assertThat(wsTask.getType()).isEqualTo("TYPE");
assertThat(wsTask.getId()).isEqualTo("UUID");
@@ -130,12 +133,14 @@ public class TaskFormatterTest {
CeQueueDto dto = new CeQueueDto();
dto.setUuid("UUID");
dto.setTaskType("TYPE");
- dto.setStatus(CeQueueDto.Status.IN_PROGRESS);
+ dto.setStatus(CeQueueDto.Status.PENDING);
dto.setCreatedAt(1_450_000_000_000L);
- dto.setStartedAt(startedAt);
+ db.getDbClient().ceQueueDao().insert(db.getSession(), dto);
+ makeInProgress(db.getSession(), "workerUuid", startedAt, dto);
+ CeQueueDto inProgress = db.getDbClient().ceQueueDao().selectByUuid(db.getSession(), dto.getUuid()).get();
when(system2.now()).thenReturn(now);
- Ce.Task wsTask = underTest.formatQueue(db.getSession(), dto);
+ Ce.Task wsTask = underTest.formatQueue(db.getSession(), inProgress);
assertThat(wsTask.getExecutionTimeMs()).isEqualTo(now - startedAt);
}