return mapper(dbSession).countByStatusAndComponentUuid(status, componentUuid);
}
- public Optional<CeQueueDto> peek(DbSession session) {
+ public Optional<CeQueueDto> peek(DbSession session, String workerUuid) {
List<EligibleTaskDto> eligibles = mapper(session).selectEligibleForPeek(ONE_ROW_LIMIT);
if (eligibles.isEmpty()) {
return Optional.absent();
}
EligibleTaskDto eligible = eligibles.get(0);
- return tryToPeek(session, eligible);
+ return tryToPeek(session, eligible, workerUuid);
}
- private Optional<CeQueueDto> tryToPeek(DbSession session, EligibleTaskDto eligible) {
+ private Optional<CeQueueDto> tryToPeek(DbSession session, EligibleTaskDto eligible, String workerUuid) {
+ long now = system2.now();
int touchedRows = mapper(session).updateIf(eligible.getUuid(),
- new UpdateIf.NewProperties(IN_PROGRESS, system2.now(), system2.now()),
- new UpdateIf.OldProperties(PENDING));
+ new UpdateIf.NewProperties(IN_PROGRESS, workerUuid, eligible.getExecutionCount() + 1, now, now),
+ new UpdateIf.OldProperties(PENDING, eligible.getExecutionCount()));
if (touchedRows != 1) {
return Optional.absent();
}
package org.sonar.db.ce;
import javax.annotation.CheckForNull;
+import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;
import static java.util.Objects.requireNonNull;
@Immutable
public static class NewProperties {
private final CeQueueDto.Status status;
+ private final String workerUuid;
+ private final int executionCount;
private final Long startedAt;
private final long updatedAt;
- NewProperties(CeQueueDto.Status status, Long startedAt, long updatedAt) {
+ NewProperties(CeQueueDto.Status status, @Nullable String workerUuid, int executionCount,
+ Long startedAt, long updatedAt) {
this.status = requireNonNull(status, "status can't be null");
+ this.workerUuid = workerUuid;
+ this.executionCount = executionCount;
this.startedAt = startedAt;
this.updatedAt = updatedAt;
}
return status;
}
+ @CheckForNull
+ public String getWorkerUuid() {
+ return workerUuid;
+ }
+
+ public int getExecutionCount() {
+ return executionCount;
+ }
+
@CheckForNull
public Long getStartedAt() {
return startedAt;
@Immutable
public static class OldProperties {
private final CeQueueDto.Status status;
+ private final int executionCount;
- OldProperties(CeQueueDto.Status status) {
+ OldProperties(CeQueueDto.Status status, int executionCount) {
this.status = requireNonNull(status, "status can't be null");
+ this.executionCount = executionCount;
}
public CeQueueDto.Status getStatus() {
return status;
}
+
+ public int getExecutionCount() {
+ return executionCount;
+ }
}
}
<update id="updateIf" parameterType="map">
update ce_queue set
status=#{new.status,jdbcType=VARCHAR},
+ execution_count=#{new.executionCount,jdbcType=INTEGER},
+ worker_uuid=#{new.workerUuid,jdbcType=VARCHAR},
started_at=#{new.startedAt,jdbcType=BIGINT},
updated_at=#{new.updatedAt,jdbcType=BIGINT}
where
uuid=#{uuid,jdbcType=VARCHAR}
and status=#{old.status,jdbcType=VARCHAR}
+ and execution_count=#{old.executionCount,jdbcType=INTEGER}
</update>
<delete id="deleteByUuid">
private static final String TASK_UUID_3 = "TASK_3";
private static final String SELECT_QUEUE_UUID_AND_STATUS_QUERY = "select uuid,status from ce_queue";
private static final String SUBMITTER_LOGIN = "henri";
- private static final String WORKER_UUID = "worker uuid";
+ private static final String WORKER_UUID_1 = "worker uuid 1";
+ private static final String WORKER_UUID_2 = "worker uuid 2";
private static final int EXECUTION_COUNT = 42;
private TestSystem2 system2 = new TestSystem2().setNow(INIT_TIME);
@Rule
public DbTester db = DbTester.create(system2);
+ private System2 mockedSystem2 = mock(System2.class);
+
private CeQueueDao underTest = new CeQueueDao(system2);
+ private CeQueueDao underTestWithSystem2Mock = new CeQueueDao(mockedSystem2);
@Test
public void insert_populates_createdAt_and_updateAt_from_System2_with_same_value_if_any_is_not_set() {
- System2 system2 = mock(System2.class);
- CeQueueDao ceQueueDao = new CeQueueDao(system2);
long now = 1_334_333L;
CeQueueDto dto = new CeQueueDto()
.setTaskType(CeTaskTypes.REPORT)
.setComponentUuid(COMPONENT_UUID_1)
.setStatus(PENDING)
.setSubmitterLogin(SUBMITTER_LOGIN)
- .setWorkerUuid(WORKER_UUID)
+ .setWorkerUuid(WORKER_UUID_1)
.setExecutionCount(EXECUTION_COUNT);
- mockSystem2ForSingleCall(system2, now);
- ceQueueDao.insert(db.getSession(), dto.setUuid(TASK_UUID_1));
- mockSystem2ForSingleCall(system2, now);
- ceQueueDao.insert(db.getSession(), dto.setUuid(TASK_UUID_2).setCreatedAt(8_000_999L).setUpdatedAt(0));
- mockSystem2ForSingleCall(system2, now);
- ceQueueDao.insert(db.getSession(), dto.setUuid(TASK_UUID_3).setCreatedAt(0).setUpdatedAt(8_000_999L));
- mockSystem2ForSingleCall(system2, now);
+ mockSystem2ForSingleCall(now);
+ underTestWithSystem2Mock.insert(db.getSession(), dto.setUuid(TASK_UUID_1));
+ mockSystem2ForSingleCall(now);
+ underTestWithSystem2Mock.insert(db.getSession(), dto.setUuid(TASK_UUID_2).setCreatedAt(8_000_999L).setUpdatedAt(0));
+ mockSystem2ForSingleCall(now);
+ underTestWithSystem2Mock.insert(db.getSession(), dto.setUuid(TASK_UUID_3).setCreatedAt(0).setUpdatedAt(8_000_999L));
+ mockSystem2ForSingleCall(now);
String uuid4 = "uuid 4";
- ceQueueDao.insert(db.getSession(), dto.setUuid(uuid4).setCreatedAt(6_888_777L).setUpdatedAt(8_000_999L));
+ underTestWithSystem2Mock.insert(db.getSession(), dto.setUuid(uuid4).setCreatedAt(6_888_777L).setUpdatedAt(8_000_999L));
db.getSession().commit();
Stream.of(TASK_UUID_1, TASK_UUID_2, TASK_UUID_3)
assertThat(saved.getComponentUuid()).isEqualTo(COMPONENT_UUID_1);
assertThat(saved.getStatus()).isEqualTo(PENDING);
assertThat(saved.getSubmitterLogin()).isEqualTo(SUBMITTER_LOGIN);
- assertThat(saved.getWorkerUuid()).isEqualTo(WORKER_UUID);
+ assertThat(saved.getWorkerUuid()).isEqualTo(WORKER_UUID_1);
assertThat(saved.getExecutionCount()).isEqualTo(EXECUTION_COUNT);
assertThat(saved.getCreatedAt()).isEqualTo(now);
assertThat(saved.getUpdatedAt()).isEqualTo(now);
assertThat(saved.getComponentUuid()).isEqualTo(COMPONENT_UUID_1);
assertThat(saved.getStatus()).isEqualTo(PENDING);
assertThat(saved.getSubmitterLogin()).isEqualTo(SUBMITTER_LOGIN);
- assertThat(saved.getWorkerUuid()).isEqualTo(WORKER_UUID);
+ assertThat(saved.getWorkerUuid()).isEqualTo(WORKER_UUID_1);
assertThat(saved.getExecutionCount()).isEqualTo(EXECUTION_COUNT);
assertThat(saved.getCreatedAt()).isEqualTo(6_888_777L);
assertThat(saved.getUpdatedAt()).isEqualTo(8_000_999L);
assertThat(saved.getStartedAt()).isNull();
}
- private void mockSystem2ForSingleCall(System2 system2, long now) {
- Mockito.reset(system2);
- when(system2.now()).thenReturn(now).thenThrow(new IllegalStateException("now should be called only once"));
- }
-
@Test
public void test_selectByUuid() {
insert(TASK_UUID_1, COMPONENT_UUID_1, PENDING);
verifyCeQueueStatuses(TASK_UUID_1, PENDING, TASK_UUID_2, PENDING, TASK_UUID_3, PENDING);
}
+ @Test
+ public void resetAllToPendingStatus_updates_updatedAt() {
+ long now = 1_334_333L;
+ insert(TASK_UUID_1, COMPONENT_UUID_1, IN_PROGRESS);
+ insert(TASK_UUID_2, COMPONENT_UUID_1, IN_PROGRESS);
+ mockSystem2ForSingleCall(now);
+
+ underTestWithSystem2Mock.resetAllToPendingStatus(db.getSession());
+
+ assertThat(underTest.selectByUuid(db.getSession(), TASK_UUID_1).get().getUpdatedAt()).isEqualTo(now);
+ assertThat(underTest.selectByUuid(db.getSession(), TASK_UUID_2).get().getUpdatedAt()).isEqualTo(now);
+ }
+
+ @Test
+ public void resetAllToPendingStatus_resets_startedAt() {
+ assertThat(insert(TASK_UUID_1, COMPONENT_UUID_1, PENDING).getStartedAt()).isNull();
+ assertThat(underTest.peek(db.getSession(), WORKER_UUID_1).get().getUuid()).isEqualTo(TASK_UUID_1);
+ assertThat(underTest.selectByUuid(db.getSession(), TASK_UUID_1).get().getStartedAt()).isNotNull();
+
+ underTest.resetAllToPendingStatus(db.getSession());
+
+ assertThat(underTest.selectByUuid(db.getSession(), TASK_UUID_1).get().getStartedAt()).isNull();
+ }
+
+ @Test
+ public void resetAllToPendingStatus_does_not_reset_workerUuid_nor_executionCount() {
+ CeQueueDto dto = new CeQueueDto()
+ .setUuid(TASK_UUID_1)
+ .setTaskType(CeTaskTypes.REPORT)
+ .setComponentUuid(COMPONENT_UUID_1)
+ .setStatus(IN_PROGRESS)
+ .setSubmitterLogin(SUBMITTER_LOGIN)
+ .setWorkerUuid(WORKER_UUID_1)
+ .setExecutionCount(EXECUTION_COUNT);
+ underTest.insert(db.getSession(), dto);
+ db.commit();
+
+ underTest.resetAllToPendingStatus(db.getSession());
+
+ CeQueueDto saved = underTest.selectByUuid(db.getSession(), TASK_UUID_1).get();
+ assertThat(saved.getWorkerUuid()).isEqualTo(WORKER_UUID_1);
+ assertThat(saved.getExecutionCount()).isEqualTo(EXECUTION_COUNT);
+ }
+
@Test
public void peek_none_if_no_pendings() throws Exception {
- assertThat(underTest.peek(db.getSession()).isPresent()).isFalse();
+ assertThat(underTest.peek(db.getSession(), WORKER_UUID_1).isPresent()).isFalse();
// not pending, but in progress
insert(TASK_UUID_1, COMPONENT_UUID_1, IN_PROGRESS);
- assertThat(underTest.peek(db.getSession()).isPresent()).isFalse();
+ assertThat(underTest.peek(db.getSession(), WORKER_UUID_1).isPresent()).isFalse();
}
@Test
verifyCeQueueStatuses(TASK_UUID_1, PENDING, TASK_UUID_2, PENDING);
// peek first one
- Optional<CeQueueDto> peek = underTest.peek(db.getSession());
+ Optional<CeQueueDto> peek = underTest.peek(db.getSession(), WORKER_UUID_1);
assertThat(peek.isPresent()).isTrue();
assertThat(peek.get().getUuid()).isEqualTo(TASK_UUID_1);
assertThat(peek.get().getStatus()).isEqualTo(IN_PROGRESS);
+ assertThat(peek.get().getWorkerUuid()).isEqualTo(WORKER_UUID_1);
+ assertThat(peek.get().getExecutionCount()).isEqualTo(1);
verifyCeQueueStatuses(TASK_UUID_1, IN_PROGRESS, TASK_UUID_2, PENDING);
// peek second one
- peek = underTest.peek(db.getSession());
+ peek = underTest.peek(db.getSession(), WORKER_UUID_2);
assertThat(peek.isPresent()).isTrue();
assertThat(peek.get().getUuid()).isEqualTo(TASK_UUID_2);
assertThat(peek.get().getStatus()).isEqualTo(IN_PROGRESS);
+ assertThat(peek.get().getWorkerUuid()).isEqualTo(WORKER_UUID_2);
+ assertThat(peek.get().getExecutionCount()).isEqualTo(1);
verifyCeQueueStatuses(TASK_UUID_1, IN_PROGRESS, TASK_UUID_2, IN_PROGRESS);
// no more pendings
- assertThat(underTest.peek(db.getSession()).isPresent()).isFalse();
+ assertThat(underTest.peek(db.getSession(), WORKER_UUID_1).isPresent()).isFalse();
}
@Test
system2.setNow(INIT_TIME + 3_000_000);
insert(TASK_UUID_2, COMPONENT_UUID_1, PENDING);
- Optional<CeQueueDto> peek = underTest.peek(db.getSession());
+ Optional<CeQueueDto> peek = underTest.peek(db.getSession(), WORKER_UUID_1);
assertThat(peek.isPresent()).isTrue();
assertThat(peek.get().getUuid()).isEqualTo(TASK_UUID_1);
+ assertThat(peek.get().getWorkerUuid()).isEqualTo(WORKER_UUID_1);
+ assertThat(peek.get().getExecutionCount()).isEqualTo(1);
verifyCeQueueStatuses(TASK_UUID_1, IN_PROGRESS, TASK_UUID_2, PENDING);
// do not peek second task as long as the first one is in progress
- peek = underTest.peek(db.getSession());
+ peek = underTest.peek(db.getSession(), WORKER_UUID_1);
assertThat(peek.isPresent()).isFalse();
// first one is finished
underTest.deleteByUuid(db.getSession(), TASK_UUID_1);
- peek = underTest.peek(db.getSession());
+ peek = underTest.peek(db.getSession(), WORKER_UUID_2);
assertThat(peek.get().getUuid()).isEqualTo(TASK_UUID_2);
+ assertThat(peek.get().getWorkerUuid()).isEqualTo(WORKER_UUID_2);
+ assertThat(peek.get().getExecutionCount()).isEqualTo(1);
}
@Test
db.commit();
}
- private void insert(String uuid, String componentUuid, CeQueueDto.Status status) {
+ private CeQueueDto insert(String uuid, String componentUuid, CeQueueDto.Status status) {
CeQueueDto dto = new CeQueueDto();
dto.setUuid(uuid);
dto.setTaskType(CeTaskTypes.REPORT);
dto.setSubmitterLogin("henri");
underTest.insert(db.getSession(), dto);
db.getSession().commit();
+ return dto;
}
private static Iterable<Map<String, Object>> upperizeKeys(List<Map<String, Object>> select) {
private static Map<String, Object> rowMap(String uuid, CeQueueDto.Status status) {
return ImmutableMap.of("UUID", uuid, "STATUS", status.name());
}
+
+ private void mockSystem2ForSingleCall(long now) {
+ Mockito.reset(mockedSystem2);
+ when(mockedSystem2.now())
+ .thenReturn(now)
+ .thenThrow(new IllegalStateException("now should be called only once"));
+ }
}
.add(1625, "Populate column ORGANIZATIONS.DEFAULT_GROUP_ID", PopulateColumnDefaultGroupIdOfOrganizations.class)
.add(1626, "Clean orphan rows in table GROUPS_USERS", CleanOrphanRowsInGroupsUsers.class)
.add(1627, "Delete permission templates linked to removed users", DeletePermissionTemplatesLinkedToRemovedUsers.class)
- ;
- .add(1628, "Add columns CE_QUEUE.WORKER_UUID and EXECUTION_COUNT", AddCeQueueWorkerUuidAndExecutionCount.class);
- .add(1629, "Make CE_QUEUE.EXECUTION_COUNT not nullable", MakeCeQueueExecutionCountNotNullable.class);
+ .add(1628, "Add columns CE_QUEUE.WORKER_UUID and EXECUTION_COUNT", AddCeQueueWorkerUuidAndExecutionCount.class)
+ .add(1629, "Make CE_QUEUE.EXECUTION_COUNT not nullable", MakeCeQueueExecutionCountNotNullable.class)
.add(1630, "Add columns CE_ACTIVITY.WORKER_UUID and EXECUTION_COUNT", AddCeActivityWorkerUuidAndExecutionCount.class)
.add(1631, "Make columns CE_ACTIVITY.EXECUTION_COUNT not nullable", MakeCeActivityExecutionCountNotNullable.class);
}
@Test
public void verify_migration_count() {
- verifyMigrationCount(underTest, 31);
+ verifyMigrationCount(underTest, 32);
}
}
*
* <p>An unchecked exception may be thrown on technical errors (db connection, ...).</p>
*/
- Optional<CeTask> peek();
+ Optional<CeTask> peek(String workerUuid);
/**
* Removes all the tasks from the queue, whatever their status. They are marked
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
+import static java.util.Objects.requireNonNull;
@ComputeEngineSide
public class InternalCeQueueImpl extends CeQueueImpl implements InternalCeQueue {
}
@Override
- public Optional<CeTask> peek() {
+ public Optional<CeTask> peek(String workerUuid) {
+ requireNonNull(workerUuid, "workerUuid can't be null");
+
if (peekPaused.get()) {
return Optional.absent();
}
try (DbSession dbSession = dbClient.openSession(false)) {
- Optional<CeQueueDto> dto = dbClient.ceQueueDao().peek(dbSession);
+ Optional<CeQueueDto> dto = dbClient.ceQueueDao().peek(dbSession, workerUuid);
CeTask task = null;
if (dto.isPresent()) {
task = loadTask(dbSession, dto.get());
private Optional<CeTask> tryAndFindTaskToExecute() {
try {
- return queue.peek();
+ return queue.peek("UNKNOWN" /*FIXME provide a real worker uuid*/);
} catch (Exception e) {
LOG.error("Failed to pop the queue of analysis reports", e);
}
public class CeQueueImplTest {
+ private static final String WORKER_UUID = "workerUuid";
private System2 system2 = new TestSystem2().setNow(1_450_000_000_000L);
@Rule
CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1");
- dbTester.getDbClient().ceQueueDao().peek(session);
+ dbTester.getDbClient().ceQueueDao().peek(session, WORKER_UUID);
underTest.cancel(task.getUuid());
}
CeTask pendingTask1 = submit(CeTaskTypes.REPORT, "PROJECT_2");
CeTask pendingTask2 = submit(CeTaskTypes.REPORT, "PROJECT_3");
- dbTester.getDbClient().ceQueueDao().peek(session);
+ dbTester.getDbClient().ceQueueDao().peek(session, WORKER_UUID);
int canceledCount = underTest.cancelAll();
assertThat(canceledCount).isEqualTo(2);
public class InternalCeQueueImplTest {
private static final String AN_ANALYSIS_UUID = "U1";
+ private static final String WORKER_UUID_1 = "worker uuid 1";
+ private static final String WORKER_UUID_2 = "worker uuid 2";
private System2 system2 = new TestSystem2().setNow(1_450_000_000_000L);
verifyCeTask(taskSubmit2, tasks.get(1), null);
}
+ @Test
+ public void peek_throws_NPE_if_workerUUid_is_null() {
+ expectedException.expect(NullPointerException.class);
+ expectedException.expectMessage("workerUuid can't be null");
+
+ underTest.peek(null);
+ }
+
@Test
public void test_remove() {
CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1");
- Optional<CeTask> peek = underTest.peek();
+ Optional<CeTask> peek = underTest.peek(WORKER_UUID_1);
underTest.remove(peek.get(), CeActivityDto.Status.SUCCESS, null, null);
// queue is empty
assertThat(dbTester.getDbClient().ceQueueDao().selectByUuid(dbTester.getSession(), task.getUuid()).isPresent()).isFalse();
- assertThat(underTest.peek().isPresent()).isFalse();
+ assertThat(underTest.peek(WORKER_UUID_2).isPresent()).isFalse();
// available in history
Optional<CeActivityDto> history = dbTester.getDbClient().ceActivityDao().selectByUuid(dbTester.getSession(), task.getUuid());
@Test
public void remove_does_not_set_analysisUuid_in_CeActivity_when_CeTaskResult_has_no_analysis_uuid() {
CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1");
- Optional<CeTask> peek = underTest.peek();
+ Optional<CeTask> peek = underTest.peek(WORKER_UUID_1);
underTest.remove(peek.get(), CeActivityDto.Status.SUCCESS, newTaskResult(null), null);
// available in history
public void remove_sets_snapshotId_in_CeActivity_when_CeTaskResult_has_no_snapshot_id() {
CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1");
- Optional<CeTask> peek = underTest.peek();
+ Optional<CeTask> peek = underTest.peek(WORKER_UUID_2);
underTest.remove(peek.get(), CeActivityDto.Status.SUCCESS, newTaskResult(AN_ANALYSIS_UUID), null);
// available in history
Throwable error = new NullPointerException("Fake NPE to test persistence to DB");
CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1");
- Optional<CeTask> peek = underTest.peek();
+ Optional<CeTask> peek = underTest.peek(WORKER_UUID_1);
underTest.remove(peek.get(), CeActivityDto.Status.FAILED, null, error);
Optional<CeActivityDto> activityDto = dbTester.getDbClient().ceActivityDao().selectByUuid(session, task.getUuid());
public void test_peek() throws Exception {
CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1");
- Optional<CeTask> peek = underTest.peek();
+ Optional<CeTask> peek = underTest.peek(WORKER_UUID_1);
assertThat(peek.isPresent()).isTrue();
assertThat(peek.get().getUuid()).isEqualTo(task.getUuid());
assertThat(peek.get().getType()).isEqualTo(CeTaskTypes.REPORT);
assertThat(peek.get().getComponentUuid()).isEqualTo("PROJECT_1");
// no more pending tasks
- peek = underTest.peek();
+ peek = underTest.peek(WORKER_UUID_2);
assertThat(peek.isPresent()).isFalse();
}
submit(CeTaskTypes.REPORT, "PROJECT_1");
underTest.pausePeek();
- Optional<CeTask> peek = underTest.peek();
+ Optional<CeTask> peek = underTest.peek(WORKER_UUID_1);
assertThat(peek.isPresent()).isFalse();
}
expectedException.expectMessage(startsWith("Task is in progress and can't be canceled"));
CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1");
- underTest.peek();
+ underTest.peek(WORKER_UUID_2);
underTest.cancel(task.getUuid());
}
CeTask inProgressTask = submit(CeTaskTypes.REPORT, "PROJECT_1");
CeTask pendingTask1 = submit(CeTaskTypes.REPORT, "PROJECT_2");
CeTask pendingTask2 = submit(CeTaskTypes.REPORT, "PROJECT_3");
- underTest.peek();
+ underTest.peek(WORKER_UUID_2);
int canceledCount = underTest.cancelAll();
assertThat(canceledCount).isEqualTo(2);
public class CeWorkerCallableImplTest {
+ private static final String UNKNOWN_WORKER_UUID = "UNKNOWN";
+
@Rule
public CeTaskProcessorRepositoryRule taskProcessorRepository = new CeTaskProcessorRepositoryRule();
@Rule
@Test
public void no_pending_tasks_in_queue() throws Exception {
- when(queue.peek()).thenReturn(Optional.<CeTask>absent());
+ when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.<CeTask>absent());
assertThat(underTest.call()).isFalse();
public void fail_when_no_CeTaskProcessor_is_found_in_repository() throws Exception {
CeTask task = createCeTask(null);
taskProcessorRepository.setNoProcessorForTask(CeTaskTypes.REPORT);
- when(queue.peek()).thenReturn(Optional.of(task));
+ when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(task));
assertThat(underTest.call()).isTrue();
public void peek_and_process_task() throws Exception {
CeTask task = createCeTask(null);
taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor);
- when(queue.peek()).thenReturn(Optional.of(task));
+ when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(task));
assertThat(underTest.call()).isTrue();
@Test
public void fail_to_process_task() throws Exception {
CeTask task = createCeTask(null);
- when(queue.peek()).thenReturn(Optional.of(task));
+ when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(task));
taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor);
Throwable error = makeTaskProcessorFail(task);
@Test
public void do_not_display_submitter_param_in_log_when_submitterLogin_is_not_set_in_case_of_success() throws Exception {
- when(queue.peek()).thenReturn(Optional.of(createCeTask(null)));
+ when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(createCeTask(null)));
taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
underTest.call();
@Test
public void do_not_display_submitter_param_in_log_when_submitterLogin_is_not_set_in_case_of_error() throws Exception {
CeTask ceTask = createCeTask(null);
- when(queue.peek()).thenReturn(Optional.of(ceTask));
+ when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(ceTask));
taskProcessorRepository.setProcessorForTask(ceTask.getType(), taskProcessor);
makeTaskProcessorFail(ceTask);
@Test
public void display_submitterLogin_in_logs_when_set_in_case_of_success() throws Exception {
- when(queue.peek()).thenReturn(Optional.of(createCeTask("FooBar")));
+ when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(createCeTask("FooBar")));
taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
underTest.call();
@Test
public void display_submitterLogin_in_logs_when_set_in_case_of_error() throws Exception {
CeTask ceTask = createCeTask("FooBar");
- when(queue.peek()).thenReturn(Optional.of(ceTask));
+ when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(ceTask));
taskProcessorRepository.setProcessorForTask(ceTask.getType(), taskProcessor);
makeTaskProcessorFail(ceTask);
public void display_start_stop_at_debug_level_for_console_if_DEBUG_is_enabled_and_task_successful() throws Exception {
logTester.setLevel(LoggerLevel.DEBUG);
- when(queue.peek()).thenReturn(Optional.of(createCeTask("FooBar")));
+ when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(createCeTask("FooBar")));
taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
underTest.call();
logTester.setLevel(LoggerLevel.DEBUG);
CeTask ceTask = createCeTask("FooBar");
- when(queue.peek()).thenReturn(Optional.of(ceTask));
+ when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(ceTask));
taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
makeTaskProcessorFail(ceTask);