|
|
@@ -146,7 +146,7 @@ public class CeWorkerImplTest { |
|
|
|
|
|
|
|
@Test |
|
|
|
public void no_pending_tasks_in_queue() throws Exception { |
|
|
|
when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.empty()); |
|
|
|
when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.empty()); |
|
|
|
|
|
|
|
assertThat(underTest.call()).isEqualTo(NO_TASK); |
|
|
|
|
|
|
@@ -155,7 +155,7 @@ public class CeWorkerImplTest { |
|
|
|
|
|
|
|
@Test |
|
|
|
public void no_pending_tasks_in_queue_without_listener() throws Exception { |
|
|
|
when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.empty()); |
|
|
|
when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.empty()); |
|
|
|
|
|
|
|
assertThat(underTestNoListener.call()).isEqualTo(NO_TASK); |
|
|
|
|
|
|
@@ -166,7 +166,7 @@ public class CeWorkerImplTest { |
|
|
|
public void fail_when_no_CeTaskProcessor_is_found_in_repository() throws Exception { |
|
|
|
CeTask task = createCeTask(null); |
|
|
|
taskProcessorRepository.setNoProcessorForTask(CeTaskTypes.REPORT); |
|
|
|
when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(task)); |
|
|
|
when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(task)); |
|
|
|
|
|
|
|
assertThat(underTest.call()).isEqualTo(TASK_PROCESSED); |
|
|
|
|
|
|
@@ -182,7 +182,7 @@ public class CeWorkerImplTest { |
|
|
|
public void fail_when_no_CeTaskProcessor_is_found_in_repository_without_listener() throws Exception { |
|
|
|
CeTask task = createCeTask(null); |
|
|
|
taskProcessorRepository.setNoProcessorForTask(CeTaskTypes.REPORT); |
|
|
|
when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(task)); |
|
|
|
when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(task)); |
|
|
|
|
|
|
|
assertThat(underTestNoListener.call()).isEqualTo(TASK_PROCESSED); |
|
|
|
|
|
|
@@ -195,7 +195,7 @@ public class CeWorkerImplTest { |
|
|
|
public void peek_and_process_task() throws Exception { |
|
|
|
CeTask task = createCeTask(null); |
|
|
|
taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor); |
|
|
|
when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(task)); |
|
|
|
when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(task)); |
|
|
|
|
|
|
|
assertThat(underTest.call()).isEqualTo(TASK_PROCESSED); |
|
|
|
|
|
|
@@ -212,7 +212,7 @@ public class CeWorkerImplTest { |
|
|
|
public void peek_and_process_task_without_listeners() throws Exception { |
|
|
|
CeTask task = createCeTask(null); |
|
|
|
taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor); |
|
|
|
when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(task)); |
|
|
|
when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(task)); |
|
|
|
|
|
|
|
assertThat(underTestNoListener.call()).isEqualTo(TASK_PROCESSED); |
|
|
|
|
|
|
@@ -225,7 +225,7 @@ public class CeWorkerImplTest { |
|
|
|
@Test |
|
|
|
public void fail_to_process_task() throws Exception { |
|
|
|
CeTask task = createCeTask(null); |
|
|
|
when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(task)); |
|
|
|
when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(task)); |
|
|
|
taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor); |
|
|
|
Throwable error = makeTaskProcessorFail(task); |
|
|
|
|
|
|
@@ -243,7 +243,7 @@ public class CeWorkerImplTest { |
|
|
|
@Test |
|
|
|
public void fail_to_process_task_without_listeners() throws Exception { |
|
|
|
CeTask task = createCeTask(null); |
|
|
|
when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(task)); |
|
|
|
when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(task)); |
|
|
|
taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor); |
|
|
|
Throwable error = makeTaskProcessorFail(task); |
|
|
|
|
|
|
@@ -257,7 +257,7 @@ public class CeWorkerImplTest { |
|
|
|
|
|
|
|
@Test |
|
|
|
public void log_task_characteristics() throws Exception { |
|
|
|
when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(createCeTask(null, "pullRequest", "123", "branch", "foo"))); |
|
|
|
when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(createCeTask(null, "pullRequest", "123", "branch", "foo"))); |
|
|
|
taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor); |
|
|
|
|
|
|
|
underTest.call(); |
|
|
@@ -272,7 +272,7 @@ public class CeWorkerImplTest { |
|
|
|
|
|
|
|
@Test |
|
|
|
public void do_not_log_submitter_param_if_anonymous_and_success() throws Exception { |
|
|
|
when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(createCeTask(null))); |
|
|
|
when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(createCeTask(null))); |
|
|
|
taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor); |
|
|
|
|
|
|
|
underTest.call(); |
|
|
@@ -288,7 +288,7 @@ public class CeWorkerImplTest { |
|
|
|
@Test |
|
|
|
public void do_not_log_submitter_param_if_anonymous_and_error() throws Exception { |
|
|
|
CeTask ceTask = createCeTask(null); |
|
|
|
when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(ceTask)); |
|
|
|
when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(ceTask)); |
|
|
|
taskProcessorRepository.setProcessorForTask(ceTask.getType(), taskProcessor); |
|
|
|
makeTaskProcessorFail(ceTask); |
|
|
|
|
|
|
@@ -308,7 +308,7 @@ public class CeWorkerImplTest { |
|
|
|
@Test |
|
|
|
public void log_submitter_login_if_authenticated_and_success() throws Exception { |
|
|
|
UserDto userDto = insertRandomUser(); |
|
|
|
when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(createCeTask(toTaskSubmitter(userDto)))); |
|
|
|
when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(createCeTask(toTaskSubmitter(userDto)))); |
|
|
|
taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor); |
|
|
|
|
|
|
|
underTest.call(); |
|
|
@@ -324,7 +324,7 @@ public class CeWorkerImplTest { |
|
|
|
|
|
|
|
@Test |
|
|
|
public void log_submitterUuid_if_user_matching_submitterUuid_can_not_be_found() throws Exception { |
|
|
|
when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(createCeTask(new CeTask.User("UUID_USER", null)))); |
|
|
|
when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(createCeTask(new CeTask.User("UUID_USER", null)))); |
|
|
|
taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor); |
|
|
|
|
|
|
|
underTest.call(); |
|
|
@@ -342,7 +342,7 @@ public class CeWorkerImplTest { |
|
|
|
public void display_submitterLogin_in_logs_when_set_in_case_of_error() throws Exception { |
|
|
|
UserDto userDto = insertRandomUser(); |
|
|
|
CeTask ceTask = createCeTask(toTaskSubmitter(userDto)); |
|
|
|
when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(ceTask)); |
|
|
|
when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(ceTask)); |
|
|
|
taskProcessorRepository.setProcessorForTask(ceTask.getType(), taskProcessor); |
|
|
|
makeTaskProcessorFail(ceTask); |
|
|
|
|
|
|
@@ -362,7 +362,7 @@ public class CeWorkerImplTest { |
|
|
|
public void display_start_stop_at_debug_level_for_console_if_DEBUG_is_enabled_and_task_successful() throws Exception { |
|
|
|
logTester.setLevel(LoggerLevel.DEBUG); |
|
|
|
|
|
|
|
when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(createCeTask(submitter))); |
|
|
|
when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(createCeTask(submitter))); |
|
|
|
taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor); |
|
|
|
|
|
|
|
underTest.call(); |
|
|
@@ -381,7 +381,7 @@ public class CeWorkerImplTest { |
|
|
|
logTester.setLevel(LoggerLevel.DEBUG); |
|
|
|
|
|
|
|
CeTask ceTask = createCeTask(submitter); |
|
|
|
when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(ceTask)); |
|
|
|
when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(ceTask)); |
|
|
|
taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor); |
|
|
|
makeTaskProcessorFail(ceTask); |
|
|
|
|
|
|
@@ -401,7 +401,7 @@ public class CeWorkerImplTest { |
|
|
|
@Test |
|
|
|
public void call_sets_and_restores_thread_name_with_information_of_worker_when_there_is_no_task_to_process() throws Exception { |
|
|
|
String threadName = randomAlphabetic(3); |
|
|
|
when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenAnswer(invocation -> { |
|
|
|
when(queue.peek(anyString(), anyBoolean())).thenAnswer(invocation -> { |
|
|
|
assertThat(Thread.currentThread().getName()) |
|
|
|
.isEqualTo("Worker " + randomOrdinal + " (UUID=" + workerUuid + ") on " + threadName); |
|
|
|
return Optional.empty(); |
|
|
@@ -415,7 +415,7 @@ public class CeWorkerImplTest { |
|
|
|
@Test |
|
|
|
public void call_sets_and_restores_thread_name_with_information_of_worker_when_a_task_is_processed() throws Exception { |
|
|
|
String threadName = randomAlphabetic(3); |
|
|
|
when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenAnswer(invocation -> { |
|
|
|
when(queue.peek(anyString(), anyBoolean())).thenAnswer(invocation -> { |
|
|
|
assertThat(Thread.currentThread().getName()) |
|
|
|
.isEqualTo("Worker " + randomOrdinal + " (UUID=" + workerUuid + ") on " + threadName); |
|
|
|
return Optional.of(createCeTask(submitter)); |
|
|
@@ -431,7 +431,7 @@ public class CeWorkerImplTest { |
|
|
|
public void call_sets_and_restores_thread_name_with_information_of_worker_when_an_error_occurs() throws Exception { |
|
|
|
String threadName = randomAlphabetic(3); |
|
|
|
CeTask ceTask = createCeTask(submitter); |
|
|
|
when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenAnswer(invocation -> { |
|
|
|
when(queue.peek(anyString(), anyBoolean())).thenAnswer(invocation -> { |
|
|
|
assertThat(Thread.currentThread().getName()) |
|
|
|
.isEqualTo("Worker " + randomOrdinal + " (UUID=" + workerUuid + ") on " + threadName); |
|
|
|
return Optional.of(ceTask); |
|
|
@@ -459,7 +459,7 @@ public class CeWorkerImplTest { |
|
|
|
@Test |
|
|
|
public void log_error_when_task_fails_with_not_MessageException() throws Exception { |
|
|
|
CeTask ceTask = createCeTask(submitter); |
|
|
|
when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(ceTask)); |
|
|
|
when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(ceTask)); |
|
|
|
taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor); |
|
|
|
makeTaskProcessorFail(ceTask); |
|
|
|
|
|
|
@@ -477,7 +477,7 @@ public class CeWorkerImplTest { |
|
|
|
@Test |
|
|
|
public void do_no_log_error_when_task_fails_with_MessageException() throws Exception { |
|
|
|
CeTask ceTask = createCeTask(submitter); |
|
|
|
when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(ceTask)); |
|
|
|
when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(ceTask)); |
|
|
|
taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor); |
|
|
|
makeTaskProcessorFail(ceTask, MessageException.of("simulate MessageException thrown by TaskProcessor#process")); |
|
|
|
|
|
|
@@ -493,7 +493,7 @@ public class CeWorkerImplTest { |
|
|
|
@Test |
|
|
|
public void log_error_when_task_was_successful_but_ending_state_can_not_be_persisted_to_db() throws Exception { |
|
|
|
CeTask ceTask = createCeTask(submitter); |
|
|
|
when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(ceTask)); |
|
|
|
when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(ceTask)); |
|
|
|
taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor); |
|
|
|
doThrow(new RuntimeException("Simulate queue#remove failing")).when(queue).remove(ceTask, CeActivityDto.Status.SUCCESS, null, null); |
|
|
|
|
|
|
@@ -505,7 +505,7 @@ public class CeWorkerImplTest { |
|
|
|
@Test |
|
|
|
public void log_error_when_task_failed_and_ending_state_can_not_be_persisted_to_db() throws Exception { |
|
|
|
CeTask ceTask = createCeTask(submitter); |
|
|
|
when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(ceTask)); |
|
|
|
when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(ceTask)); |
|
|
|
taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor); |
|
|
|
IllegalStateException ex = makeTaskProcessorFail(ceTask); |
|
|
|
RuntimeException runtimeException = new RuntimeException("Simulate queue#remove failing"); |
|
|
@@ -534,7 +534,7 @@ public class CeWorkerImplTest { |
|
|
|
@Test |
|
|
|
public void log_error_as_suppressed_when_task_failed_with_MessageException_and_ending_state_can_not_be_persisted_to_db() throws Exception { |
|
|
|
CeTask ceTask = createCeTask(submitter); |
|
|
|
when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(ceTask)); |
|
|
|
when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(ceTask)); |
|
|
|
taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor); |
|
|
|
MessageException ex = makeTaskProcessorFail(ceTask, MessageException.of("simulate MessageException thrown by TaskProcessor#process")); |
|
|
|
RuntimeException runtimeException = new RuntimeException("Simulate queue#remove failing"); |
|
|
@@ -565,7 +565,7 @@ public class CeWorkerImplTest { |
|
|
|
CountDownLatch inCallLatch = new CountDownLatch(1); |
|
|
|
CountDownLatch assertionsDoneLatch = new CountDownLatch(1); |
|
|
|
// mock long running peek(String) call => Thread is executing call() but not running a task |
|
|
|
when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenAnswer((Answer<Optional<CeTask>>) invocation -> { |
|
|
|
when(queue.peek(anyString(), anyBoolean())).thenAnswer((Answer<Optional<CeTask>>) invocation -> { |
|
|
|
inCallLatch.countDown(); |
|
|
|
try { |
|
|
|
assertionsDoneLatch.await(10, TimeUnit.SECONDS); |
|
|
@@ -600,7 +600,7 @@ public class CeWorkerImplTest { |
|
|
|
String taskType = randomAlphabetic(12); |
|
|
|
CeTask ceTask = mock(CeTask.class); |
|
|
|
when(ceTask.getType()).thenReturn(taskType); |
|
|
|
when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(ceTask)); |
|
|
|
when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(ceTask)); |
|
|
|
taskProcessorRepository.setProcessorForTask(taskType, new SimpleCeTaskProcessor() { |
|
|
|
@CheckForNull |
|
|
|
@Override |
|
|
@@ -641,15 +641,13 @@ public class CeWorkerImplTest { |
|
|
|
@Test |
|
|
|
public void do_not_exclude_portfolio_when_indexation_task_lookup_is_disabled() throws Exception { |
|
|
|
// first call with empty queue to disable indexationTaskLookupEnabled |
|
|
|
when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.empty()); |
|
|
|
when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.empty()); |
|
|
|
assertThat(underTest.call()).isEqualTo(NO_TASK); |
|
|
|
|
|
|
|
ArgumentCaptor<Boolean> booleanCaptor = ArgumentCaptor.forClass(Boolean.class); |
|
|
|
// following calls should not exclude portfolios |
|
|
|
when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(createCeTask(submitter))); |
|
|
|
when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(createCeTask(submitter))); |
|
|
|
assertThat(underTest.call()).isEqualTo(TASK_PROCESSED); |
|
|
|
verify(queue, times(3)).peek(anyString(), anyBoolean(), booleanCaptor.capture()); |
|
|
|
assertThat(booleanCaptor.getAllValues()).containsExactly(true, true, false); |
|
|
|
verify(queue, times(2)).peek(anyString(), anyBoolean()); |
|
|
|
} |
|
|
|
|
|
|
|
@Test |
|
|
@@ -657,7 +655,7 @@ public class CeWorkerImplTest { |
|
|
|
CountDownLatch inCallLatch = new CountDownLatch(1); |
|
|
|
CountDownLatch assertionsDoneLatch = new CountDownLatch(1); |
|
|
|
// mock long running peek(String) call => Thread is executing call() but not running a task |
|
|
|
when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenAnswer((Answer<Optional<CeTask>>) invocation -> { |
|
|
|
when(queue.peek(anyString(), anyBoolean())).thenAnswer((Answer<Optional<CeTask>>) invocation -> { |
|
|
|
inCallLatch.countDown(); |
|
|
|
try { |
|
|
|
assertionsDoneLatch.await(10, TimeUnit.SECONDS); |
|
|
@@ -688,7 +686,7 @@ public class CeWorkerImplTest { |
|
|
|
String taskType = randomAlphabetic(12); |
|
|
|
CeTask ceTask = mock(CeTask.class); |
|
|
|
when(ceTask.getType()).thenReturn(taskType); |
|
|
|
when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(ceTask)); |
|
|
|
when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(ceTask)); |
|
|
|
taskProcessorRepository.setProcessorForTask(taskType, new SimpleCeTaskProcessor() { |
|
|
|
|
|
|
|
@CheckForNull |
|
|
@@ -745,7 +743,7 @@ public class CeWorkerImplTest { |
|
|
|
} |
|
|
|
|
|
|
|
private void verifyWorkerUuid() { |
|
|
|
verify(queue, atLeastOnce()).peek(workerUuidCaptor.capture(), anyBoolean(), anyBoolean()); |
|
|
|
verify(queue, atLeastOnce()).peek(workerUuidCaptor.capture(), anyBoolean()); |
|
|
|
assertThat(workerUuidCaptor.getValue()).isEqualTo(workerUuid); |
|
|
|
} |
|
|
|
|