try {
queue.remove(task, status, taskResult, error);
} catch (Exception e) {
- LOG.error(format("Failed to finalize task with uuid '%s' and persist its state to db", task.getUuid()), e);
+ String errorMessage = format("Failed to finalize task with uuid '%s' and persist its state to db", task.getUuid());
+ if (error instanceof MessageException) {
+ LOG.error(format("%s. Task failed with MessageException \"%s\"", errorMessage, error.getMessage()), e);
+ } else {
+ LOG.error(errorMessage, e);
+ }
} finally {
stopActivityProfiler(ceProfiler, task, status);
ceLogging.clearForTask();
makeTaskProcessorFail(ceTask);
underTest.call();
+
List<String> logs = logTester.logs(LoggerLevel.ERROR);
assertThat(logs).hasSize(2);
assertThat(logs.get(0)).isEqualTo("Failed to execute task " + ceTask.getUuid());
makeTaskProcessorFail(ceTask, MessageException.of("simulate MessageException thrown by TaskProcessor#process"));
underTest.call();
+
List<String> logs = logTester.logs(LoggerLevel.ERROR);
assertThat(logs).hasSize(1);
assertThat(logs.get(0)).contains(" | submitter=FooBar | time=");
}
+ @Test
+ public void log_error_when_task_was_successful_but_ending_state_can_not_be_persisted_to_db() throws Exception {
+ CeTask ceTask = createCeTask("FooBar");
+ when(queue.peek(anyString())).thenReturn(Optional.of(ceTask));
+ taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
+ doThrow(new RuntimeException("Simulate queue#remove failing")).when(queue).remove(ceTask, CeActivityDto.Status.SUCCESS, null, null);
+
+ underTest.call();
+
+ assertThat(logTester.logs(LoggerLevel.ERROR)).containsOnly("Failed to finalize task with uuid '" + ceTask.getUuid() + "' and persist its state to db");
+ }
+
+ @Test
+ public void log_error_when_task_failed_and_ending_state_can_not_be_persisted_to_db() throws Exception {
+ CeTask ceTask = createCeTask("FooBar");
+ when(queue.peek(anyString())).thenReturn(Optional.of(ceTask));
+ taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
+ IllegalStateException ex = makeTaskProcessorFail(ceTask);
+ doThrow(new RuntimeException("Simulate queue#remove failing")).when(queue).remove(ceTask, CeActivityDto.Status.FAILED, null, ex);
+
+ underTest.call();
+
+ List<String> logs = logTester.logs(LoggerLevel.ERROR);
+ assertThat(logs).hasSize(3);
+ assertThat(logs.get(0)).isEqualTo("Failed to execute task " + ceTask.getUuid());
+ assertThat(logs.get(1)).isEqualTo("Failed to finalize task with uuid '" + ceTask.getUuid() + "' and persist its state to db");
+ assertThat(logs.get(2)).contains(" | submitter=FooBar | time=");
+ }
+
+ @Test
+ public void log_error_when_task_failed_with_MessageException_and_ending_state_can_not_be_persisted_to_db() throws Exception {
+ CeTask ceTask = createCeTask("FooBar");
+ when(queue.peek(anyString())).thenReturn(Optional.of(ceTask));
+ taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
+ MessageException ex = makeTaskProcessorFail(ceTask, MessageException.of("simulate MessageException thrown by TaskProcessor#process"));
+ doThrow(new RuntimeException("Simulate queue#remove failing")).when(queue).remove(ceTask, CeActivityDto.Status.FAILED, null, ex);
+
+ underTest.call();
+
+ List<String> logs = logTester.logs(LoggerLevel.ERROR);
+ assertThat(logs).hasSize(2);
+ assertThat(logs.get(0)).isEqualTo("Failed to finalize task with uuid '" + ceTask.getUuid() + "' and persist its state to db. " +
+ "Task failed with MessageException \"" + ex.getMessage() + "\"");
+ assertThat(logs.get(1)).contains(" | submitter=FooBar | time=");
+ }
+
private Thread createThreadNameVerifyingThread(String threadName) {
return new Thread(() -> {
verifyUnchangedThreadName(threadName);