@@ -109,20 +109,21 @@ public class CeQueueDao implements Dao { | |||
return mapper(dbSession).countByStatusAndComponentUuid(status, componentUuid); | |||
} | |||
public Optional<CeQueueDto> peek(DbSession session) { | |||
public Optional<CeQueueDto> peek(DbSession session, String workerUuid) { | |||
List<EligibleTaskDto> eligibles = mapper(session).selectEligibleForPeek(ONE_ROW_LIMIT); | |||
if (eligibles.isEmpty()) { | |||
return Optional.absent(); | |||
} | |||
EligibleTaskDto eligible = eligibles.get(0); | |||
return tryToPeek(session, eligible); | |||
return tryToPeek(session, eligible, workerUuid); | |||
} | |||
private Optional<CeQueueDto> tryToPeek(DbSession session, EligibleTaskDto eligible) { | |||
private Optional<CeQueueDto> tryToPeek(DbSession session, EligibleTaskDto eligible, String workerUuid) { | |||
long now = system2.now(); | |||
int touchedRows = mapper(session).updateIf(eligible.getUuid(), | |||
new UpdateIf.NewProperties(IN_PROGRESS, system2.now(), system2.now()), | |||
new UpdateIf.OldProperties(PENDING)); | |||
new UpdateIf.NewProperties(IN_PROGRESS, workerUuid, eligible.getExecutionCount() + 1, now, now), | |||
new UpdateIf.OldProperties(PENDING, eligible.getExecutionCount())); | |||
if (touchedRows != 1) { | |||
return Optional.absent(); | |||
} |
@@ -20,6 +20,7 @@ | |||
package org.sonar.db.ce; | |||
import javax.annotation.CheckForNull; | |||
import javax.annotation.Nullable; | |||
import javax.annotation.concurrent.Immutable; | |||
import static java.util.Objects.requireNonNull; | |||
@@ -32,11 +33,16 @@ final class UpdateIf { | |||
@Immutable | |||
public static class NewProperties { | |||
private final CeQueueDto.Status status; | |||
private final String workerUuid; | |||
private final int executionCount; | |||
private final Long startedAt; | |||
private final long updatedAt; | |||
NewProperties(CeQueueDto.Status status, Long startedAt, long updatedAt) { | |||
NewProperties(CeQueueDto.Status status, @Nullable String workerUuid, int executionCount, | |||
Long startedAt, long updatedAt) { | |||
this.status = requireNonNull(status, "status can't be null"); | |||
this.workerUuid = workerUuid; | |||
this.executionCount = executionCount; | |||
this.startedAt = startedAt; | |||
this.updatedAt = updatedAt; | |||
} | |||
@@ -45,6 +51,15 @@ final class UpdateIf { | |||
return status; | |||
} | |||
@CheckForNull | |||
public String getWorkerUuid() { | |||
return workerUuid; | |||
} | |||
public int getExecutionCount() { | |||
return executionCount; | |||
} | |||
@CheckForNull | |||
public Long getStartedAt() { | |||
return startedAt; | |||
@@ -58,14 +73,20 @@ final class UpdateIf { | |||
@Immutable | |||
public static class OldProperties { | |||
private final CeQueueDto.Status status; | |||
private final int executionCount; | |||
OldProperties(CeQueueDto.Status status) { | |||
OldProperties(CeQueueDto.Status status, int executionCount) { | |||
this.status = requireNonNull(status, "status can't be null"); | |||
this.executionCount = executionCount; | |||
} | |||
public CeQueueDto.Status getStatus() { | |||
return status; | |||
} | |||
public int getExecutionCount() { | |||
return executionCount; | |||
} | |||
} | |||
} |
@@ -172,11 +172,14 @@ | |||
<update id="updateIf" parameterType="map"> | |||
update ce_queue set | |||
status=#{new.status,jdbcType=VARCHAR}, | |||
execution_count=#{new.executionCount,jdbcType=INTEGER}, | |||
worker_uuid=#{new.workerUuid,jdbcType=VARCHAR}, | |||
started_at=#{new.startedAt,jdbcType=BIGINT}, | |||
updated_at=#{new.updatedAt,jdbcType=BIGINT} | |||
where | |||
uuid=#{uuid,jdbcType=VARCHAR} | |||
and status=#{old.status,jdbcType=VARCHAR} | |||
and execution_count=#{old.executionCount,jdbcType=INTEGER} | |||
</update> | |||
<delete id="deleteByUuid"> |
@@ -54,7 +54,8 @@ public class CeQueueDaoTest { | |||
private static final String TASK_UUID_3 = "TASK_3"; | |||
private static final String SELECT_QUEUE_UUID_AND_STATUS_QUERY = "select uuid,status from ce_queue"; | |||
private static final String SUBMITTER_LOGIN = "henri"; | |||
private static final String WORKER_UUID = "worker uuid"; | |||
private static final String WORKER_UUID_1 = "worker uuid 1"; | |||
private static final String WORKER_UUID_2 = "worker uuid 2"; | |||
private static final int EXECUTION_COUNT = 42; | |||
private TestSystem2 system2 = new TestSystem2().setNow(INIT_TIME); | |||
@@ -62,30 +63,31 @@ public class CeQueueDaoTest { | |||
@Rule | |||
public DbTester db = DbTester.create(system2); | |||
private System2 mockedSystem2 = mock(System2.class); | |||
private CeQueueDao underTest = new CeQueueDao(system2); | |||
private CeQueueDao underTestWithSystem2Mock = new CeQueueDao(mockedSystem2); | |||
@Test | |||
public void insert_populates_createdAt_and_updateAt_from_System2_with_same_value_if_any_is_not_set() { | |||
System2 system2 = mock(System2.class); | |||
CeQueueDao ceQueueDao = new CeQueueDao(system2); | |||
long now = 1_334_333L; | |||
CeQueueDto dto = new CeQueueDto() | |||
.setTaskType(CeTaskTypes.REPORT) | |||
.setComponentUuid(COMPONENT_UUID_1) | |||
.setStatus(PENDING) | |||
.setSubmitterLogin(SUBMITTER_LOGIN) | |||
.setWorkerUuid(WORKER_UUID) | |||
.setWorkerUuid(WORKER_UUID_1) | |||
.setExecutionCount(EXECUTION_COUNT); | |||
mockSystem2ForSingleCall(system2, now); | |||
ceQueueDao.insert(db.getSession(), dto.setUuid(TASK_UUID_1)); | |||
mockSystem2ForSingleCall(system2, now); | |||
ceQueueDao.insert(db.getSession(), dto.setUuid(TASK_UUID_2).setCreatedAt(8_000_999L).setUpdatedAt(0)); | |||
mockSystem2ForSingleCall(system2, now); | |||
ceQueueDao.insert(db.getSession(), dto.setUuid(TASK_UUID_3).setCreatedAt(0).setUpdatedAt(8_000_999L)); | |||
mockSystem2ForSingleCall(system2, now); | |||
mockSystem2ForSingleCall(now); | |||
underTestWithSystem2Mock.insert(db.getSession(), dto.setUuid(TASK_UUID_1)); | |||
mockSystem2ForSingleCall(now); | |||
underTestWithSystem2Mock.insert(db.getSession(), dto.setUuid(TASK_UUID_2).setCreatedAt(8_000_999L).setUpdatedAt(0)); | |||
mockSystem2ForSingleCall(now); | |||
underTestWithSystem2Mock.insert(db.getSession(), dto.setUuid(TASK_UUID_3).setCreatedAt(0).setUpdatedAt(8_000_999L)); | |||
mockSystem2ForSingleCall(now); | |||
String uuid4 = "uuid 4"; | |||
ceQueueDao.insert(db.getSession(), dto.setUuid(uuid4).setCreatedAt(6_888_777L).setUpdatedAt(8_000_999L)); | |||
underTestWithSystem2Mock.insert(db.getSession(), dto.setUuid(uuid4).setCreatedAt(6_888_777L).setUpdatedAt(8_000_999L)); | |||
db.getSession().commit(); | |||
Stream.of(TASK_UUID_1, TASK_UUID_2, TASK_UUID_3) | |||
@@ -96,7 +98,7 @@ public class CeQueueDaoTest { | |||
assertThat(saved.getComponentUuid()).isEqualTo(COMPONENT_UUID_1); | |||
assertThat(saved.getStatus()).isEqualTo(PENDING); | |||
assertThat(saved.getSubmitterLogin()).isEqualTo(SUBMITTER_LOGIN); | |||
assertThat(saved.getWorkerUuid()).isEqualTo(WORKER_UUID); | |||
assertThat(saved.getWorkerUuid()).isEqualTo(WORKER_UUID_1); | |||
assertThat(saved.getExecutionCount()).isEqualTo(EXECUTION_COUNT); | |||
assertThat(saved.getCreatedAt()).isEqualTo(now); | |||
assertThat(saved.getUpdatedAt()).isEqualTo(now); | |||
@@ -108,18 +110,13 @@ public class CeQueueDaoTest { | |||
assertThat(saved.getComponentUuid()).isEqualTo(COMPONENT_UUID_1); | |||
assertThat(saved.getStatus()).isEqualTo(PENDING); | |||
assertThat(saved.getSubmitterLogin()).isEqualTo(SUBMITTER_LOGIN); | |||
assertThat(saved.getWorkerUuid()).isEqualTo(WORKER_UUID); | |||
assertThat(saved.getWorkerUuid()).isEqualTo(WORKER_UUID_1); | |||
assertThat(saved.getExecutionCount()).isEqualTo(EXECUTION_COUNT); | |||
assertThat(saved.getCreatedAt()).isEqualTo(6_888_777L); | |||
assertThat(saved.getUpdatedAt()).isEqualTo(8_000_999L); | |||
assertThat(saved.getStartedAt()).isNull(); | |||
} | |||
private void mockSystem2ForSingleCall(System2 system2, long now) { | |||
Mockito.reset(system2); | |||
when(system2.now()).thenReturn(now).thenThrow(new IllegalStateException("now should be called only once")); | |||
} | |||
@Test | |||
public void test_selectByUuid() { | |||
insert(TASK_UUID_1, COMPONENT_UUID_1, PENDING); | |||
@@ -182,13 +179,57 @@ public class CeQueueDaoTest { | |||
verifyCeQueueStatuses(TASK_UUID_1, PENDING, TASK_UUID_2, PENDING, TASK_UUID_3, PENDING); | |||
} | |||
@Test | |||
public void resetAllToPendingStatus_updates_updatedAt() { | |||
long now = 1_334_333L; | |||
insert(TASK_UUID_1, COMPONENT_UUID_1, IN_PROGRESS); | |||
insert(TASK_UUID_2, COMPONENT_UUID_1, IN_PROGRESS); | |||
mockSystem2ForSingleCall(now); | |||
underTestWithSystem2Mock.resetAllToPendingStatus(db.getSession()); | |||
assertThat(underTest.selectByUuid(db.getSession(), TASK_UUID_1).get().getUpdatedAt()).isEqualTo(now); | |||
assertThat(underTest.selectByUuid(db.getSession(), TASK_UUID_2).get().getUpdatedAt()).isEqualTo(now); | |||
} | |||
@Test | |||
public void resetAllToPendingStatus_resets_startedAt() { | |||
assertThat(insert(TASK_UUID_1, COMPONENT_UUID_1, PENDING).getStartedAt()).isNull(); | |||
assertThat(underTest.peek(db.getSession(), WORKER_UUID_1).get().getUuid()).isEqualTo(TASK_UUID_1); | |||
assertThat(underTest.selectByUuid(db.getSession(), TASK_UUID_1).get().getStartedAt()).isNotNull(); | |||
underTest.resetAllToPendingStatus(db.getSession()); | |||
assertThat(underTest.selectByUuid(db.getSession(), TASK_UUID_1).get().getStartedAt()).isNull(); | |||
} | |||
@Test | |||
public void resetAllToPendingStatus_does_not_reset_workerUuid_nor_executionCount() { | |||
CeQueueDto dto = new CeQueueDto() | |||
.setUuid(TASK_UUID_1) | |||
.setTaskType(CeTaskTypes.REPORT) | |||
.setComponentUuid(COMPONENT_UUID_1) | |||
.setStatus(IN_PROGRESS) | |||
.setSubmitterLogin(SUBMITTER_LOGIN) | |||
.setWorkerUuid(WORKER_UUID_1) | |||
.setExecutionCount(EXECUTION_COUNT); | |||
underTest.insert(db.getSession(), dto); | |||
db.commit(); | |||
underTest.resetAllToPendingStatus(db.getSession()); | |||
CeQueueDto saved = underTest.selectByUuid(db.getSession(), TASK_UUID_1).get(); | |||
assertThat(saved.getWorkerUuid()).isEqualTo(WORKER_UUID_1); | |||
assertThat(saved.getExecutionCount()).isEqualTo(EXECUTION_COUNT); | |||
} | |||
@Test | |||
public void peek_none_if_no_pendings() throws Exception { | |||
assertThat(underTest.peek(db.getSession()).isPresent()).isFalse(); | |||
assertThat(underTest.peek(db.getSession(), WORKER_UUID_1).isPresent()).isFalse(); | |||
// not pending, but in progress | |||
insert(TASK_UUID_1, COMPONENT_UUID_1, IN_PROGRESS); | |||
assertThat(underTest.peek(db.getSession()).isPresent()).isFalse(); | |||
assertThat(underTest.peek(db.getSession(), WORKER_UUID_1).isPresent()).isFalse(); | |||
} | |||
@Test | |||
@@ -201,21 +242,25 @@ public class CeQueueDaoTest { | |||
verifyCeQueueStatuses(TASK_UUID_1, PENDING, TASK_UUID_2, PENDING); | |||
// peek first one | |||
Optional<CeQueueDto> peek = underTest.peek(db.getSession()); | |||
Optional<CeQueueDto> peek = underTest.peek(db.getSession(), WORKER_UUID_1); | |||
assertThat(peek.isPresent()).isTrue(); | |||
assertThat(peek.get().getUuid()).isEqualTo(TASK_UUID_1); | |||
assertThat(peek.get().getStatus()).isEqualTo(IN_PROGRESS); | |||
assertThat(peek.get().getWorkerUuid()).isEqualTo(WORKER_UUID_1); | |||
assertThat(peek.get().getExecutionCount()).isEqualTo(1); | |||
verifyCeQueueStatuses(TASK_UUID_1, IN_PROGRESS, TASK_UUID_2, PENDING); | |||
// peek second one | |||
peek = underTest.peek(db.getSession()); | |||
peek = underTest.peek(db.getSession(), WORKER_UUID_2); | |||
assertThat(peek.isPresent()).isTrue(); | |||
assertThat(peek.get().getUuid()).isEqualTo(TASK_UUID_2); | |||
assertThat(peek.get().getStatus()).isEqualTo(IN_PROGRESS); | |||
assertThat(peek.get().getWorkerUuid()).isEqualTo(WORKER_UUID_2); | |||
assertThat(peek.get().getExecutionCount()).isEqualTo(1); | |||
verifyCeQueueStatuses(TASK_UUID_1, IN_PROGRESS, TASK_UUID_2, IN_PROGRESS); | |||
// no more pendings | |||
assertThat(underTest.peek(db.getSession()).isPresent()).isFalse(); | |||
assertThat(underTest.peek(db.getSession(), WORKER_UUID_1).isPresent()).isFalse(); | |||
} | |||
@Test | |||
@@ -225,19 +270,23 @@ public class CeQueueDaoTest { | |||
system2.setNow(INIT_TIME + 3_000_000); | |||
insert(TASK_UUID_2, COMPONENT_UUID_1, PENDING); | |||
Optional<CeQueueDto> peek = underTest.peek(db.getSession()); | |||
Optional<CeQueueDto> peek = underTest.peek(db.getSession(), WORKER_UUID_1); | |||
assertThat(peek.isPresent()).isTrue(); | |||
assertThat(peek.get().getUuid()).isEqualTo(TASK_UUID_1); | |||
assertThat(peek.get().getWorkerUuid()).isEqualTo(WORKER_UUID_1); | |||
assertThat(peek.get().getExecutionCount()).isEqualTo(1); | |||
verifyCeQueueStatuses(TASK_UUID_1, IN_PROGRESS, TASK_UUID_2, PENDING); | |||
// do not peek second task as long as the first one is in progress | |||
peek = underTest.peek(db.getSession()); | |||
peek = underTest.peek(db.getSession(), WORKER_UUID_1); | |||
assertThat(peek.isPresent()).isFalse(); | |||
// first one is finished | |||
underTest.deleteByUuid(db.getSession(), TASK_UUID_1); | |||
peek = underTest.peek(db.getSession()); | |||
peek = underTest.peek(db.getSession(), WORKER_UUID_2); | |||
assertThat(peek.get().getUuid()).isEqualTo(TASK_UUID_2); | |||
assertThat(peek.get().getWorkerUuid()).isEqualTo(WORKER_UUID_2); | |||
assertThat(peek.get().getExecutionCount()).isEqualTo(1); | |||
} | |||
@Test | |||
@@ -372,7 +421,7 @@ public class CeQueueDaoTest { | |||
db.commit(); | |||
} | |||
private void insert(String uuid, String componentUuid, CeQueueDto.Status status) { | |||
private CeQueueDto insert(String uuid, String componentUuid, CeQueueDto.Status status) { | |||
CeQueueDto dto = new CeQueueDto(); | |||
dto.setUuid(uuid); | |||
dto.setTaskType(CeTaskTypes.REPORT); | |||
@@ -381,6 +430,7 @@ public class CeQueueDaoTest { | |||
dto.setSubmitterLogin("henri"); | |||
underTest.insert(db.getSession(), dto); | |||
db.getSession().commit(); | |||
return dto; | |||
} | |||
private static Iterable<Map<String, Object>> upperizeKeys(List<Map<String, Object>> select) { | |||
@@ -417,4 +467,11 @@ public class CeQueueDaoTest { | |||
private static Map<String, Object> rowMap(String uuid, CeQueueDto.Status status) { | |||
return ImmutableMap.of("UUID", uuid, "STATUS", status.name()); | |||
} | |||
private void mockSystem2ForSingleCall(long now) { | |||
Mockito.reset(mockedSystem2); | |||
when(mockedSystem2.now()) | |||
.thenReturn(now) | |||
.thenThrow(new IllegalStateException("now should be called only once")); | |||
} | |||
} |
@@ -57,9 +57,8 @@ public class DbVersion64 implements DbVersion { | |||
.add(1625, "Populate column ORGANIZATIONS.DEFAULT_GROUP_ID", PopulateColumnDefaultGroupIdOfOrganizations.class) | |||
.add(1626, "Clean orphan rows in table GROUPS_USERS", CleanOrphanRowsInGroupsUsers.class) | |||
.add(1627, "Delete permission templates linked to removed users", DeletePermissionTemplatesLinkedToRemovedUsers.class) | |||
; | |||
.add(1628, "Add columns CE_QUEUE.WORKER_UUID and EXECUTION_COUNT", AddCeQueueWorkerUuidAndExecutionCount.class); | |||
.add(1629, "Make CE_QUEUE.EXECUTION_COUNT not nullable", MakeCeQueueExecutionCountNotNullable.class); | |||
.add(1628, "Add columns CE_QUEUE.WORKER_UUID and EXECUTION_COUNT", AddCeQueueWorkerUuidAndExecutionCount.class) | |||
.add(1629, "Make CE_QUEUE.EXECUTION_COUNT not nullable", MakeCeQueueExecutionCountNotNullable.class) | |||
.add(1630, "Add columns CE_ACTIVITY.WORKER_UUID and EXECUTION_COUNT", AddCeActivityWorkerUuidAndExecutionCount.class) | |||
.add(1631, "Make columns CE_ACTIVITY.EXECUTION_COUNT not nullable", MakeCeActivityExecutionCountNotNullable.class); | |||
} |
@@ -35,7 +35,7 @@ public class DbVersion64Test { | |||
@Test | |||
public void verify_migration_count() { | |||
verifyMigrationCount(underTest, 31); | |||
verifyMigrationCount(underTest, 32); | |||
} | |||
} | |||
@@ -46,7 +46,7 @@ public interface InternalCeQueue extends CeQueue { | |||
* | |||
* <p>An unchecked exception may be thrown on technical errors (db connection, ...).</p> | |||
*/ | |||
Optional<CeTask> peek(); | |||
Optional<CeTask> peek(String workerUuid); | |||
/** | |||
* Removes all the tasks from the queue, whatever their status. They are marked |
@@ -43,6 +43,7 @@ import org.sonar.server.organization.DefaultOrganizationProvider; | |||
import static com.google.common.base.Preconditions.checkArgument; | |||
import static com.google.common.base.Preconditions.checkState; | |||
import static java.util.Objects.requireNonNull; | |||
@ComputeEngineSide | |||
public class InternalCeQueueImpl extends CeQueueImpl implements InternalCeQueue { | |||
@@ -63,12 +64,14 @@ public class InternalCeQueueImpl extends CeQueueImpl implements InternalCeQueue | |||
} | |||
@Override | |||
public Optional<CeTask> peek() { | |||
public Optional<CeTask> peek(String workerUuid) { | |||
requireNonNull(workerUuid, "workerUuid can't be null"); | |||
if (peekPaused.get()) { | |||
return Optional.absent(); | |||
} | |||
try (DbSession dbSession = dbClient.openSession(false)) { | |||
Optional<CeQueueDto> dto = dbClient.ceQueueDao().peek(dbSession); | |||
Optional<CeQueueDto> dto = dbClient.ceQueueDao().peek(dbSession, workerUuid); | |||
CeTask task = null; | |||
if (dto.isPresent()) { | |||
task = loadTask(dbSession, dto.get()); |
@@ -64,7 +64,7 @@ public class CeWorkerCallableImpl implements CeWorkerCallable { | |||
private Optional<CeTask> tryAndFindTaskToExecute() { | |||
try { | |||
return queue.peek(); | |||
return queue.peek("UNKNOWN" /*FIXME provide a real worker uuid*/); | |||
} catch (Exception e) { | |||
LOG.error("Failed to pop the queue of analysis reports", e); | |||
} |
@@ -45,6 +45,7 @@ import static org.hamcrest.Matchers.startsWith; | |||
public class CeQueueImplTest { | |||
private static final String WORKER_UUID = "workerUuid"; | |||
private System2 system2 = new TestSystem2().setNow(1_450_000_000_000L); | |||
@Rule | |||
@@ -146,7 +147,7 @@ public class CeQueueImplTest { | |||
CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1"); | |||
dbTester.getDbClient().ceQueueDao().peek(session); | |||
dbTester.getDbClient().ceQueueDao().peek(session, WORKER_UUID); | |||
underTest.cancel(task.getUuid()); | |||
} | |||
@@ -157,7 +158,7 @@ public class CeQueueImplTest { | |||
CeTask pendingTask1 = submit(CeTaskTypes.REPORT, "PROJECT_2"); | |||
CeTask pendingTask2 = submit(CeTaskTypes.REPORT, "PROJECT_3"); | |||
dbTester.getDbClient().ceQueueDao().peek(session); | |||
dbTester.getDbClient().ceQueueDao().peek(session, WORKER_UUID); | |||
int canceledCount = underTest.cancelAll(); | |||
assertThat(canceledCount).isEqualTo(2); |
@@ -56,6 +56,8 @@ import static org.mockito.Mockito.when; | |||
public class InternalCeQueueImplTest { | |||
private static final String AN_ANALYSIS_UUID = "U1"; | |||
private static final String WORKER_UUID_1 = "worker uuid 1"; | |||
private static final String WORKER_UUID_2 = "worker uuid 2"; | |||
private System2 system2 = new TestSystem2().setNow(1_450_000_000_000L); | |||
@@ -136,15 +138,23 @@ public class InternalCeQueueImplTest { | |||
verifyCeTask(taskSubmit2, tasks.get(1), null); | |||
} | |||
@Test | |||
public void peek_throws_NPE_if_workerUUid_is_null() { | |||
expectedException.expect(NullPointerException.class); | |||
expectedException.expectMessage("workerUuid can't be null"); | |||
underTest.peek(null); | |||
} | |||
@Test | |||
public void test_remove() { | |||
CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1"); | |||
Optional<CeTask> peek = underTest.peek(); | |||
Optional<CeTask> peek = underTest.peek(WORKER_UUID_1); | |||
underTest.remove(peek.get(), CeActivityDto.Status.SUCCESS, null, null); | |||
// queue is empty | |||
assertThat(dbTester.getDbClient().ceQueueDao().selectByUuid(dbTester.getSession(), task.getUuid()).isPresent()).isFalse(); | |||
assertThat(underTest.peek().isPresent()).isFalse(); | |||
assertThat(underTest.peek(WORKER_UUID_2).isPresent()).isFalse(); | |||
// available in history | |||
Optional<CeActivityDto> history = dbTester.getDbClient().ceActivityDao().selectByUuid(dbTester.getSession(), task.getUuid()); | |||
@@ -173,7 +183,7 @@ public class InternalCeQueueImplTest { | |||
@Test | |||
public void remove_does_not_set_analysisUuid_in_CeActivity_when_CeTaskResult_has_no_analysis_uuid() { | |||
CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1"); | |||
Optional<CeTask> peek = underTest.peek(); | |||
Optional<CeTask> peek = underTest.peek(WORKER_UUID_1); | |||
underTest.remove(peek.get(), CeActivityDto.Status.SUCCESS, newTaskResult(null), null); | |||
// available in history | |||
@@ -186,7 +196,7 @@ public class InternalCeQueueImplTest { | |||
public void remove_sets_snapshotId_in_CeActivity_when_CeTaskResult_has_no_snapshot_id() { | |||
CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1"); | |||
Optional<CeTask> peek = underTest.peek(); | |||
Optional<CeTask> peek = underTest.peek(WORKER_UUID_2); | |||
underTest.remove(peek.get(), CeActivityDto.Status.SUCCESS, newTaskResult(AN_ANALYSIS_UUID), null); | |||
// available in history | |||
@@ -200,7 +210,7 @@ public class InternalCeQueueImplTest { | |||
Throwable error = new NullPointerException("Fake NPE to test persistence to DB"); | |||
CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1"); | |||
Optional<CeTask> peek = underTest.peek(); | |||
Optional<CeTask> peek = underTest.peek(WORKER_UUID_1); | |||
underTest.remove(peek.get(), CeActivityDto.Status.FAILED, null, error); | |||
Optional<CeActivityDto> activityDto = dbTester.getDbClient().ceActivityDao().selectByUuid(session, task.getUuid()); | |||
@@ -251,14 +261,14 @@ public class InternalCeQueueImplTest { | |||
public void test_peek() throws Exception { | |||
CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1"); | |||
Optional<CeTask> peek = underTest.peek(); | |||
Optional<CeTask> peek = underTest.peek(WORKER_UUID_1); | |||
assertThat(peek.isPresent()).isTrue(); | |||
assertThat(peek.get().getUuid()).isEqualTo(task.getUuid()); | |||
assertThat(peek.get().getType()).isEqualTo(CeTaskTypes.REPORT); | |||
assertThat(peek.get().getComponentUuid()).isEqualTo("PROJECT_1"); | |||
// no more pending tasks | |||
peek = underTest.peek(); | |||
peek = underTest.peek(WORKER_UUID_2); | |||
assertThat(peek.isPresent()).isFalse(); | |||
} | |||
@@ -284,7 +294,7 @@ public class InternalCeQueueImplTest { | |||
submit(CeTaskTypes.REPORT, "PROJECT_1"); | |||
underTest.pausePeek(); | |||
Optional<CeTask> peek = underTest.peek(); | |||
Optional<CeTask> peek = underTest.peek(WORKER_UUID_1); | |||
assertThat(peek.isPresent()).isFalse(); | |||
} | |||
@@ -326,7 +336,7 @@ public class InternalCeQueueImplTest { | |||
expectedException.expectMessage(startsWith("Task is in progress and can't be canceled")); | |||
CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1"); | |||
underTest.peek(); | |||
underTest.peek(WORKER_UUID_2); | |||
underTest.cancel(task.getUuid()); | |||
} | |||
@@ -336,7 +346,7 @@ public class InternalCeQueueImplTest { | |||
CeTask inProgressTask = submit(CeTaskTypes.REPORT, "PROJECT_1"); | |||
CeTask pendingTask1 = submit(CeTaskTypes.REPORT, "PROJECT_2"); | |||
CeTask pendingTask2 = submit(CeTaskTypes.REPORT, "PROJECT_3"); | |||
underTest.peek(); | |||
underTest.peek(WORKER_UUID_2); | |||
int canceledCount = underTest.cancelAll(); | |||
assertThat(canceledCount).isEqualTo(2); |
@@ -44,6 +44,8 @@ import static org.mockito.Mockito.when; | |||
public class CeWorkerCallableImplTest { | |||
private static final String UNKNOWN_WORKER_UUID = "UNKNOWN"; | |||
@Rule | |||
public CeTaskProcessorRepositoryRule taskProcessorRepository = new CeTaskProcessorRepositoryRule(); | |||
@Rule | |||
@@ -57,7 +59,7 @@ public class CeWorkerCallableImplTest { | |||
@Test | |||
public void no_pending_tasks_in_queue() throws Exception { | |||
when(queue.peek()).thenReturn(Optional.<CeTask>absent()); | |||
when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.<CeTask>absent()); | |||
assertThat(underTest.call()).isFalse(); | |||
@@ -68,7 +70,7 @@ public class CeWorkerCallableImplTest { | |||
public void fail_when_no_CeTaskProcessor_is_found_in_repository() throws Exception { | |||
CeTask task = createCeTask(null); | |||
taskProcessorRepository.setNoProcessorForTask(CeTaskTypes.REPORT); | |||
when(queue.peek()).thenReturn(Optional.of(task)); | |||
when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(task)); | |||
assertThat(underTest.call()).isTrue(); | |||
@@ -81,7 +83,7 @@ public class CeWorkerCallableImplTest { | |||
public void peek_and_process_task() throws Exception { | |||
CeTask task = createCeTask(null); | |||
taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor); | |||
when(queue.peek()).thenReturn(Optional.of(task)); | |||
when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(task)); | |||
assertThat(underTest.call()).isTrue(); | |||
@@ -94,7 +96,7 @@ public class CeWorkerCallableImplTest { | |||
@Test | |||
public void fail_to_process_task() throws Exception { | |||
CeTask task = createCeTask(null); | |||
when(queue.peek()).thenReturn(Optional.of(task)); | |||
when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(task)); | |||
taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor); | |||
Throwable error = makeTaskProcessorFail(task); | |||
@@ -108,7 +110,7 @@ public class CeWorkerCallableImplTest { | |||
@Test | |||
public void do_not_display_submitter_param_in_log_when_submitterLogin_is_not_set_in_case_of_success() throws Exception { | |||
when(queue.peek()).thenReturn(Optional.of(createCeTask(null))); | |||
when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(createCeTask(null))); | |||
taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor); | |||
underTest.call(); | |||
@@ -123,7 +125,7 @@ public class CeWorkerCallableImplTest { | |||
@Test | |||
public void do_not_display_submitter_param_in_log_when_submitterLogin_is_not_set_in_case_of_error() throws Exception { | |||
CeTask ceTask = createCeTask(null); | |||
when(queue.peek()).thenReturn(Optional.of(ceTask)); | |||
when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(ceTask)); | |||
taskProcessorRepository.setProcessorForTask(ceTask.getType(), taskProcessor); | |||
makeTaskProcessorFail(ceTask); | |||
@@ -142,7 +144,7 @@ public class CeWorkerCallableImplTest { | |||
@Test | |||
public void display_submitterLogin_in_logs_when_set_in_case_of_success() throws Exception { | |||
when(queue.peek()).thenReturn(Optional.of(createCeTask("FooBar"))); | |||
when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(createCeTask("FooBar"))); | |||
taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor); | |||
underTest.call(); | |||
@@ -158,7 +160,7 @@ public class CeWorkerCallableImplTest { | |||
@Test | |||
public void display_submitterLogin_in_logs_when_set_in_case_of_error() throws Exception { | |||
CeTask ceTask = createCeTask("FooBar"); | |||
when(queue.peek()).thenReturn(Optional.of(ceTask)); | |||
when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(ceTask)); | |||
taskProcessorRepository.setProcessorForTask(ceTask.getType(), taskProcessor); | |||
makeTaskProcessorFail(ceTask); | |||
@@ -177,7 +179,7 @@ public class CeWorkerCallableImplTest { | |||
public void display_start_stop_at_debug_level_for_console_if_DEBUG_is_enabled_and_task_successful() throws Exception { | |||
logTester.setLevel(LoggerLevel.DEBUG); | |||
when(queue.peek()).thenReturn(Optional.of(createCeTask("FooBar"))); | |||
when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(createCeTask("FooBar"))); | |||
taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor); | |||
underTest.call(); | |||
@@ -195,7 +197,7 @@ public class CeWorkerCallableImplTest { | |||
logTester.setLevel(LoggerLevel.DEBUG); | |||
CeTask ceTask = createCeTask("FooBar"); | |||
when(queue.peek()).thenReturn(Optional.of(ceTask)); | |||
when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(ceTask)); | |||
taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor); | |||
makeTaskProcessorFail(ceTask); | |||