expectedException.expect(NullPointerException.class);
expectedException.expectMessage("workerUuid can't be null");
- underTest.peek(null, false, false);
+ underTest.peek(null, true);
}
@Test
public void test_remove() {
CeTask task = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1"));
- Optional<CeTask> peek = underTest.peek(WORKER_UUID_1, false, false);
+ Optional<CeTask> peek = underTest.peek(WORKER_UUID_1, true);
underTest.remove(peek.get(), CeActivityDto.Status.SUCCESS, null, null);
// queue is empty
assertThat(db.getDbClient().ceQueueDao().selectByUuid(db.getSession(), task.getUuid())).isNotPresent();
- assertThat(underTest.peek(WORKER_UUID_2, false, false)).isNotPresent();
+ assertThat(underTest.peek(WORKER_UUID_2, true)).isNotPresent();
// available in history
Optional<CeActivityDto> history = db.getDbClient().ceActivityDao().selectByUuid(db.getSession(), task.getUuid());
@Test
public void remove_does_not_set_analysisUuid_in_CeActivity_when_CeTaskResult_has_no_analysis_uuid() {
CeTask task = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1"));
- Optional<CeTask> peek = underTest.peek(WORKER_UUID_1, false, false);
+ Optional<CeTask> peek = underTest.peek(WORKER_UUID_1, true);
underTest.remove(peek.get(), CeActivityDto.Status.SUCCESS, newTaskResult(null), null);
// available in history
public void remove_sets_analysisUuid_in_CeActivity_when_CeTaskResult_has_analysis_uuid() {
CeTask task = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1"));
- Optional<CeTask> peek = underTest.peek(WORKER_UUID_2, false, false);
+ Optional<CeTask> peek = underTest.peek(WORKER_UUID_2, true);
underTest.remove(peek.get(), CeActivityDto.Status.SUCCESS, newTaskResult(AN_ANALYSIS_UUID), null);
// available in history
Throwable error = new NullPointerException("Fake NPE to test persistence to DB");
CeTask task = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1"));
- Optional<CeTask> peek = underTest.peek(WORKER_UUID_1, false, false);
+ Optional<CeTask> peek = underTest.peek(WORKER_UUID_1, true);
underTest.remove(peek.get(), CeActivityDto.Status.FAILED, null, error);
Optional<CeActivityDto> activityDto = db.getDbClient().ceActivityDao().selectByUuid(session, task.getUuid());
Throwable error = new TypedExceptionImpl("aType", "aMessage");
CeTask task = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1"));
- Optional<CeTask> peek = underTest.peek(WORKER_UUID_1, false, false);
+ Optional<CeTask> peek = underTest.peek(WORKER_UUID_1, true);
underTest.remove(peek.get(), CeActivityDto.Status.FAILED, null, error);
CeActivityDto activityDto = db.getDbClient().ceActivityDao().selectByUuid(session, task.getUuid()).get();
public void test_peek() {
CeTask task = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1"));
- Optional<CeTask> peek = underTest.peek(WORKER_UUID_1, false, false);
+ Optional<CeTask> peek = underTest.peek(WORKER_UUID_1, true);
assertThat(peek.isPresent()).isTrue();
assertThat(peek.get().getUuid()).isEqualTo(task.getUuid());
assertThat(peek.get().getType()).isEqualTo(CeTaskTypes.REPORT);
assertThat(peek.get().getMainComponent()).contains(peek.get().getComponent().get());
// no more pending tasks
- peek = underTest.peek(WORKER_UUID_2, false, false);
+ peek = underTest.peek(WORKER_UUID_2, true);
assertThat(peek.isPresent()).isFalse();
}
ComponentDto branch = db.components().insertProjectBranch(project);
CeTask task = submit(CeTaskTypes.REPORT, branch);
- Optional<CeTask> peek = underTest.peek(WORKER_UUID_1, false, false);
+ Optional<CeTask> peek = underTest.peek(WORKER_UUID_1, true);
assertThat(peek.isPresent()).isTrue();
assertThat(peek.get().getUuid()).isEqualTo(task.getUuid());
assertThat(peek.get().getType()).isEqualTo(CeTaskTypes.REPORT);
assertThat(peek.get().getMainComponent()).contains(new CeTask.Component(project.uuid(), project.getDbKey(), project.name()));
// no more pending tasks
- peek = underTest.peek(WORKER_UUID_2, false, false);
+ peek = underTest.peek(WORKER_UUID_2, true);
assertThat(peek.isPresent()).isFalse();
}
CeTask task = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1"));
underTest.pauseWorkers();
- Optional<CeTask> peek = underTest.peek(WORKER_UUID_1, false, false);
+ Optional<CeTask> peek = underTest.peek(WORKER_UUID_1, true);
assertThat(peek).isEmpty();
underTest.resumeWorkers();
- peek = underTest.peek(WORKER_UUID_1, false, false);
+ peek = underTest.peek(WORKER_UUID_1, true);
assertThat(peek).isPresent();
assertThat(peek.get().getUuid()).isEqualTo(task.getUuid());
}
makeInProgress(dto, "foo");
db.commit();
- assertThat(underTest.peek(WORKER_UUID_1, false, false)).isEmpty();
+ assertThat(underTest.peek(WORKER_UUID_1, true)).isEmpty();
}
@Test
submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1"));
when(computeEngineStatus.getStatus()).thenReturn(STOPPING);
- Optional<CeTask> peek = underTest.peek(WORKER_UUID_1, false, false);
+ Optional<CeTask> peek = underTest.peek(WORKER_UUID_1, true);
assertThat(peek.isPresent()).isFalse();
}
.setStatus(CeQueueDto.Status.PENDING));
db.commit();
- assertThat(underTest.peek(WORKER_UUID_1, false, false).get().getUuid()).isEqualTo("uuid");
+ assertThat(underTest.peek(WORKER_UUID_1, true).get().getUuid()).isEqualTo("uuid");
}
@Test
CeQueueDto u1 = insertPending("u1");// will be picked-because older than any of the reset ones
CeQueueDto u2 = insertInProgress("u2", WORKER_UUID_1);// will be reset
- assertThat(underTest.peek(WORKER_UUID_1, false, false).get().getUuid()).isEqualTo("u0");
+ assertThat(underTest.peek(WORKER_UUID_1, true).get().getUuid()).isEqualTo("u0");
verifyUnmodifiedTask(u1);
verifyResetTask(u2);
CeQueueDto u3 = insertInProgress("u3", WORKER_UUID_1);
CeQueueDto u4 = insertInProgress("u4", WORKER_UUID_2);
- assertThat(underTest.peek(WORKER_UUID_1, false, false).get().getUuid()).isEqualTo("u0");
+ assertThat(underTest.peek(WORKER_UUID_1, true).get().getUuid()).isEqualTo("u0");
verifyResetTask(u1);
verifyUnmodifiedTask(u2);
@Test
public void fail_to_cancel_if_in_progress() {
CeTask task = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1"));
- underTest.peek(WORKER_UUID_2, false, false);
+ underTest.peek(WORKER_UUID_2, true);
CeQueueDto queueDto = db.getDbClient().ceQueueDao().selectByUuid(db.getSession(), task.getUuid()).get();
expectedException.expect(IllegalStateException.class);
CeTask inProgressTask = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1"));
CeTask pendingTask1 = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_2"));
CeTask pendingTask2 = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_3"));
- underTest.peek(WORKER_UUID_2, false, false);
+ underTest.peek(WORKER_UUID_2, true);
int canceledCount = underTest.cancelAll();
assertThat(canceledCount).isEqualTo(2);
@Test
public void no_pending_tasks_in_queue() throws Exception {
- when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.empty());
+ when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.empty());
assertThat(underTest.call()).isEqualTo(NO_TASK);
@Test
public void no_pending_tasks_in_queue_without_listener() throws Exception {
- when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.empty());
+ when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.empty());
assertThat(underTestNoListener.call()).isEqualTo(NO_TASK);
public void fail_when_no_CeTaskProcessor_is_found_in_repository() throws Exception {
CeTask task = createCeTask(null);
taskProcessorRepository.setNoProcessorForTask(CeTaskTypes.REPORT);
- when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(task));
+ when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(task));
assertThat(underTest.call()).isEqualTo(TASK_PROCESSED);
public void fail_when_no_CeTaskProcessor_is_found_in_repository_without_listener() throws Exception {
CeTask task = createCeTask(null);
taskProcessorRepository.setNoProcessorForTask(CeTaskTypes.REPORT);
- when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(task));
+ when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(task));
assertThat(underTestNoListener.call()).isEqualTo(TASK_PROCESSED);
public void peek_and_process_task() throws Exception {
CeTask task = createCeTask(null);
taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor);
- when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(task));
+ when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(task));
assertThat(underTest.call()).isEqualTo(TASK_PROCESSED);
public void peek_and_process_task_without_listeners() throws Exception {
CeTask task = createCeTask(null);
taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor);
- when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(task));
+ when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(task));
assertThat(underTestNoListener.call()).isEqualTo(TASK_PROCESSED);
@Test
public void fail_to_process_task() throws Exception {
CeTask task = createCeTask(null);
- when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(task));
+ when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(task));
taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor);
Throwable error = makeTaskProcessorFail(task);
@Test
public void fail_to_process_task_without_listeners() throws Exception {
CeTask task = createCeTask(null);
- when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(task));
+ when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(task));
taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor);
Throwable error = makeTaskProcessorFail(task);
@Test
public void log_task_characteristics() throws Exception {
- when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(createCeTask(null, "pullRequest", "123", "branch", "foo")));
+ when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(createCeTask(null, "pullRequest", "123", "branch", "foo")));
taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
underTest.call();
@Test
public void do_not_log_submitter_param_if_anonymous_and_success() throws Exception {
- when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(createCeTask(null)));
+ when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(createCeTask(null)));
taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
underTest.call();
@Test
public void do_not_log_submitter_param_if_anonymous_and_error() throws Exception {
CeTask ceTask = createCeTask(null);
- when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(ceTask));
+ when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(ceTask));
taskProcessorRepository.setProcessorForTask(ceTask.getType(), taskProcessor);
makeTaskProcessorFail(ceTask);
@Test
public void log_submitter_login_if_authenticated_and_success() throws Exception {
UserDto userDto = insertRandomUser();
- when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(createCeTask(toTaskSubmitter(userDto))));
+ when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(createCeTask(toTaskSubmitter(userDto))));
taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
underTest.call();
@Test
public void log_submitterUuid_if_user_matching_submitterUuid_can_not_be_found() throws Exception {
- when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(createCeTask(new CeTask.User("UUID_USER", null))));
+ when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(createCeTask(new CeTask.User("UUID_USER", null))));
taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
underTest.call();
public void display_submitterLogin_in_logs_when_set_in_case_of_error() throws Exception {
UserDto userDto = insertRandomUser();
CeTask ceTask = createCeTask(toTaskSubmitter(userDto));
- when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(ceTask));
+ when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(ceTask));
taskProcessorRepository.setProcessorForTask(ceTask.getType(), taskProcessor);
makeTaskProcessorFail(ceTask);
public void display_start_stop_at_debug_level_for_console_if_DEBUG_is_enabled_and_task_successful() throws Exception {
logTester.setLevel(LoggerLevel.DEBUG);
- when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(createCeTask(submitter)));
+ when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(createCeTask(submitter)));
taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
underTest.call();
logTester.setLevel(LoggerLevel.DEBUG);
CeTask ceTask = createCeTask(submitter);
- when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(ceTask));
+ when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(ceTask));
taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
makeTaskProcessorFail(ceTask);
@Test
public void call_sets_and_restores_thread_name_with_information_of_worker_when_there_is_no_task_to_process() throws Exception {
String threadName = randomAlphabetic(3);
- when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenAnswer(invocation -> {
+ when(queue.peek(anyString(), anyBoolean())).thenAnswer(invocation -> {
assertThat(Thread.currentThread().getName())
.isEqualTo("Worker " + randomOrdinal + " (UUID=" + workerUuid + ") on " + threadName);
return Optional.empty();
@Test
public void call_sets_and_restores_thread_name_with_information_of_worker_when_a_task_is_processed() throws Exception {
String threadName = randomAlphabetic(3);
- when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenAnswer(invocation -> {
+ when(queue.peek(anyString(), anyBoolean())).thenAnswer(invocation -> {
assertThat(Thread.currentThread().getName())
.isEqualTo("Worker " + randomOrdinal + " (UUID=" + workerUuid + ") on " + threadName);
return Optional.of(createCeTask(submitter));
public void call_sets_and_restores_thread_name_with_information_of_worker_when_an_error_occurs() throws Exception {
String threadName = randomAlphabetic(3);
CeTask ceTask = createCeTask(submitter);
- when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenAnswer(invocation -> {
+ when(queue.peek(anyString(), anyBoolean())).thenAnswer(invocation -> {
assertThat(Thread.currentThread().getName())
.isEqualTo("Worker " + randomOrdinal + " (UUID=" + workerUuid + ") on " + threadName);
return Optional.of(ceTask);
@Test
public void log_error_when_task_fails_with_not_MessageException() throws Exception {
CeTask ceTask = createCeTask(submitter);
- when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(ceTask));
+ when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(ceTask));
taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
makeTaskProcessorFail(ceTask);
@Test
public void do_no_log_error_when_task_fails_with_MessageException() throws Exception {
CeTask ceTask = createCeTask(submitter);
- when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(ceTask));
+ when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(ceTask));
taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
makeTaskProcessorFail(ceTask, MessageException.of("simulate MessageException thrown by TaskProcessor#process"));
@Test
public void log_error_when_task_was_successful_but_ending_state_can_not_be_persisted_to_db() throws Exception {
CeTask ceTask = createCeTask(submitter);
- when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(ceTask));
+ when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(ceTask));
taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
doThrow(new RuntimeException("Simulate queue#remove failing")).when(queue).remove(ceTask, CeActivityDto.Status.SUCCESS, null, null);
@Test
public void log_error_when_task_failed_and_ending_state_can_not_be_persisted_to_db() throws Exception {
CeTask ceTask = createCeTask(submitter);
- when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(ceTask));
+ when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(ceTask));
taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
IllegalStateException ex = makeTaskProcessorFail(ceTask);
RuntimeException runtimeException = new RuntimeException("Simulate queue#remove failing");
@Test
public void log_error_as_suppressed_when_task_failed_with_MessageException_and_ending_state_can_not_be_persisted_to_db() throws Exception {
CeTask ceTask = createCeTask(submitter);
- when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(ceTask));
+ when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(ceTask));
taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
MessageException ex = makeTaskProcessorFail(ceTask, MessageException.of("simulate MessageException thrown by TaskProcessor#process"));
RuntimeException runtimeException = new RuntimeException("Simulate queue#remove failing");
CountDownLatch inCallLatch = new CountDownLatch(1);
CountDownLatch assertionsDoneLatch = new CountDownLatch(1);
// mock long running peek(String) call => Thread is executing call() but not running a task
- when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenAnswer((Answer<Optional<CeTask>>) invocation -> {
+ when(queue.peek(anyString(), anyBoolean())).thenAnswer((Answer<Optional<CeTask>>) invocation -> {
inCallLatch.countDown();
try {
assertionsDoneLatch.await(10, TimeUnit.SECONDS);
String taskType = randomAlphabetic(12);
CeTask ceTask = mock(CeTask.class);
when(ceTask.getType()).thenReturn(taskType);
- when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(ceTask));
+ when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(ceTask));
taskProcessorRepository.setProcessorForTask(taskType, new SimpleCeTaskProcessor() {
@CheckForNull
@Override
@Test
public void do_not_exclude_portfolio_when_indexation_task_lookup_is_disabled() throws Exception {
// first call with empty queue to disable indexationTaskLookupEnabled
- when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.empty());
+ when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.empty());
assertThat(underTest.call()).isEqualTo(NO_TASK);
- ArgumentCaptor<Boolean> booleanCaptor = ArgumentCaptor.forClass(Boolean.class);
// following calls should not exclude portfolios
- when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(createCeTask(submitter)));
+ when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(createCeTask(submitter)));
assertThat(underTest.call()).isEqualTo(TASK_PROCESSED);
- verify(queue, times(3)).peek(anyString(), anyBoolean(), booleanCaptor.capture());
- assertThat(booleanCaptor.getAllValues()).containsExactly(true, true, false);
+ verify(queue, times(2)).peek(anyString(), anyBoolean());
}
@Test
CountDownLatch inCallLatch = new CountDownLatch(1);
CountDownLatch assertionsDoneLatch = new CountDownLatch(1);
// mock long running peek(String) call => Thread is executing call() but not running a task
- when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenAnswer((Answer<Optional<CeTask>>) invocation -> {
+ when(queue.peek(anyString(), anyBoolean())).thenAnswer((Answer<Optional<CeTask>>) invocation -> {
inCallLatch.countDown();
try {
assertionsDoneLatch.await(10, TimeUnit.SECONDS);
String taskType = randomAlphabetic(12);
CeTask ceTask = mock(CeTask.class);
when(ceTask.getType()).thenReturn(taskType);
- when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(ceTask));
+ when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(ceTask));
taskProcessorRepository.setProcessorForTask(taskType, new SimpleCeTaskProcessor() {
@CheckForNull
}
private void verifyWorkerUuid() {
- verify(queue, atLeastOnce()).peek(workerUuidCaptor.capture(), anyBoolean(), anyBoolean());
+ verify(queue, atLeastOnce()).peek(workerUuidCaptor.capture(), anyBoolean());
assertThat(workerUuidCaptor.getValue()).isEqualTo(workerUuid);
}