From 4518338ceaee07b951afbb0f5da69a164dc79a63 Mon Sep 17 00:00:00 2001 From: =?utf8?q?S=C3=A9bastien=20Lesaint?= Date: Wed, 17 Aug 2016 10:00:36 +0200 Subject: [PATCH] SONAR-7842 persit task error message and stacktrace when occur --- .../computation/queue/InternalCeQueue.java | 7 +- .../queue/InternalCeQueueImpl.java | 115 ++++++++++++++++-- .../taskprocessor/CeWorkerCallableImpl.java | 8 +- .../queue/InternalCeQueueImplTest.java | 50 +++++++- .../CeWorkerCallableImplTest.java | 14 ++- 5 files changed, 169 insertions(+), 25 deletions(-) diff --git a/server/sonar-server/src/main/java/org/sonar/server/computation/queue/InternalCeQueue.java b/server/sonar-server/src/main/java/org/sonar/server/computation/queue/InternalCeQueue.java index b0526f4efa4..fee5ec2959b 100644 --- a/server/sonar-server/src/main/java/org/sonar/server/computation/queue/InternalCeQueue.java +++ b/server/sonar-server/src/main/java/org/sonar/server/computation/queue/InternalCeQueue.java @@ -25,7 +25,7 @@ import org.sonar.ce.queue.CeQueue; import org.sonar.ce.queue.CeTask; import org.sonar.ce.queue.CeTaskResult; import org.sonar.db.DbSession; -import org.sonar.db.ce.CeActivityDto; +import org.sonar.db.ce.CeActivityDto.Status; import org.sonar.db.ce.CeQueueDto; /** @@ -50,7 +50,7 @@ public interface InternalCeQueue extends CeQueue { /** * Removes all the tasks from the queue, whatever their status. They are marked - * as {@link CeActivityDto.Status#CANCELED} in past activity. + * as {@link Status#CANCELED} in past activity. * This method can NOT be called when workers are being executed, as in progress * tasks can't be killed. * @@ -63,8 +63,9 @@ public interface InternalCeQueue extends CeQueue { * is called by Compute Engine workers when task is processed and can include an option {@link CeTaskResult} object. * * @throws IllegalStateException if the task does not exist in the queue + * @throws IllegalArgumentException if {@code error} is non {@code null} but {@code status} is not {@link Status#FAILED} */ - void remove(CeTask task, CeActivityDto.Status status, @Nullable CeTaskResult taskResult); + void remove(CeTask task, Status status, @Nullable CeTaskResult taskResult, @Nullable Throwable error); void cancel(DbSession dbSession, CeQueueDto ceQueueDto); diff --git a/server/sonar-server/src/main/java/org/sonar/server/computation/queue/InternalCeQueueImpl.java b/server/sonar-server/src/main/java/org/sonar/server/computation/queue/InternalCeQueueImpl.java index 34cd6d4f337..de096949334 100644 --- a/server/sonar-server/src/main/java/org/sonar/server/computation/queue/InternalCeQueueImpl.java +++ b/server/sonar-server/src/main/java/org/sonar/server/computation/queue/InternalCeQueueImpl.java @@ -20,8 +20,14 @@ package org.sonar.server.computation.queue; import com.google.common.base.Optional; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.io.PrintWriter; import java.util.concurrent.atomic.AtomicBoolean; +import javax.annotation.CheckForNull; import javax.annotation.Nullable; +import org.apache.log4j.Logger; import org.sonar.api.ce.ComputeEngineSide; import org.sonar.api.utils.System2; import org.sonar.ce.monitoring.CEQueueStatus; @@ -34,7 +40,8 @@ import org.sonar.db.DbSession; import org.sonar.db.ce.CeActivityDto; import org.sonar.db.ce.CeQueueDto; -import static java.lang.String.format; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; @ComputeEngineSide public class InternalCeQueueImpl extends CeQueueImpl implements InternalCeQueue { @@ -46,8 +53,7 @@ public class InternalCeQueueImpl extends CeQueueImpl implements InternalCeQueue // state private AtomicBoolean peekPaused = new AtomicBoolean(false); - public InternalCeQueueImpl(System2 system2, DbClient dbClient, UuidFactory uuidFactory, - CEQueueStatus queueStatus) { + public InternalCeQueueImpl(System2 system2, DbClient dbClient, UuidFactory uuidFactory, CEQueueStatus queueStatus) { super(dbClient, uuidFactory); this.system2 = system2; this.dbClient = dbClient; @@ -80,19 +86,18 @@ public class InternalCeQueueImpl extends CeQueueImpl implements InternalCeQueue } @Override - public void remove(CeTask task, CeActivityDto.Status status, CeTaskResult taskResult) { + public void remove(CeTask task, CeActivityDto.Status status, @Nullable CeTaskResult taskResult, @Nullable Throwable error) { + checkArgument(error == null || status == CeActivityDto.Status.FAILED, "Error can be provided only when status is FAILED"); DbSession dbSession = dbClient.openSession(false); try { Optional queueDto = dbClient.ceQueueDao().selectByUuid(dbSession, task.getUuid()); - if (!queueDto.isPresent()) { - throw new IllegalStateException(format("Task does not exist anymore: %s", task)); - } + checkState(queueDto.isPresent(), "Task does not exist anymore: %s", task); CeActivityDto activityDto = new CeActivityDto(queueDto.get()); activityDto.setStatus(status); updateQueueStatus(status, activityDto); updateTaskResult(activityDto, taskResult); + updateError(activityDto, error); remove(dbSession, queueDto.get(), activityDto); - } finally { dbClient.closeSession(dbSession); } @@ -107,6 +112,31 @@ public class InternalCeQueueImpl extends CeQueueImpl implements InternalCeQueue } } + private static void updateError(CeActivityDto activityDto, @Nullable Throwable error) { + if (error == null) { + return; + } + + activityDto.setErrorMessage(error.getMessage()); + String stacktrace = getStackTraceForPersistence(error); + if (stacktrace != null) { + activityDto.setErrorStacktrace(stacktrace); + } + } + + @CheckForNull + private static String getStackTraceForPersistence(Throwable error) { + try (ByteArrayOutputStream out = new ByteArrayOutputStream(); + LineReturnEnforcedPrintStream printStream = new LineReturnEnforcedPrintStream(out);) { + error.printStackTrace(printStream); + printStream.flush(); + return out.toString(); + } catch (IOException e) { + Logger.getLogger(InternalCeQueueImpl.class).debug("Failed to getStacktrace out of error", e); + return null; + } + } + private void updateQueueStatus(CeActivityDto.Status status, CeActivityDto activityDto) { Long startedAt = activityDto.getStartedAt(); if (startedAt == null) { @@ -142,4 +172,73 @@ public class InternalCeQueueImpl extends CeQueueImpl implements InternalCeQueue return peekPaused.get(); } + /** + * A {@link PrintWriter} subclass which enforces that line returns are {@code \n} whichever the platform. + */ + private static class LineReturnEnforcedPrintStream extends PrintWriter { + + LineReturnEnforcedPrintStream(OutputStream out) { + super(out); + } + + @Override + public void println() { + super.print('\n'); + } + + @Override + public void println(boolean x) { + super.print(x); + println(); + } + + @Override + public void println(char x) { + super.print(x); + println(); + } + + @Override + public void println(int x) { + super.print(x); + println(); + } + + @Override + public void println(long x) { + super.print(x); + println(); + } + + @Override + public void println(float x) { + super.print(x); + println(); + } + + @Override + public void println(double x) { + super.print(x); + println(); + } + + @Override + public void println(char[] x) { + super.print(x); + println(); + } + + @Override + public void println(String x) { + super.print(x); + println(); + } + + @Override + public void println(Object x) { + super.print(x); + println(); + } + } + } diff --git a/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeWorkerCallableImpl.java b/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeWorkerCallableImpl.java index 7b0ad5f558d..0b80433e5ac 100644 --- a/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeWorkerCallableImpl.java +++ b/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeWorkerCallableImpl.java @@ -75,12 +75,13 @@ public class CeWorkerCallableImpl implements CeWorkerCallable { Profiler ceProfiler = startActivityProfiler(task); CeActivityDto.Status status = CeActivityDto.Status.FAILED; - CeTaskResult process = null; + CeTaskResult taskResult = null; + Throwable error = null; try { // TODO delegate the message to the related task processor, according to task type Optional taskProcessor = taskProcessorRepository.getForCeTask(task); if (taskProcessor.isPresent()) { - process = taskProcessor.get().process(task); + taskResult = taskProcessor.get().process(task); status = CeActivityDto.Status.SUCCESS; } else { LOG.error("No CeTaskProcessor is defined for task of type {}. Plugin configuration may have changed", task.getType()); @@ -88,8 +89,9 @@ public class CeWorkerCallableImpl implements CeWorkerCallable { } } catch (Throwable e) { LOG.error(format("Failed to execute task %s", task.getUuid()), e); + error = e; } finally { - queue.remove(task, status, process); + queue.remove(task, status, taskResult, error); stopActivityProfiler(ceProfiler, task, status); ceLogging.clearForTask(); } diff --git a/server/sonar-server/src/test/java/org/sonar/server/computation/queue/InternalCeQueueImplTest.java b/server/sonar-server/src/test/java/org/sonar/server/computation/queue/InternalCeQueueImplTest.java index 2ec21d16240..09cafe12a9a 100644 --- a/server/sonar-server/src/test/java/org/sonar/server/computation/queue/InternalCeQueueImplTest.java +++ b/server/sonar-server/src/test/java/org/sonar/server/computation/queue/InternalCeQueueImplTest.java @@ -20,6 +20,8 @@ package org.sonar.server.computation.queue; import com.google.common.base.Optional; +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; import java.util.List; import javax.annotation.Nullable; import org.junit.Rule; @@ -44,6 +46,7 @@ import org.sonar.server.computation.monitoring.CEQueueStatusImpl; import static java.util.Arrays.asList; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.guava.api.Assertions.assertThat; import static org.hamcrest.Matchers.startsWith; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -134,7 +137,7 @@ public class InternalCeQueueImplTest { public void test_remove() { CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1"); Optional peek = underTest.peek(); - underTest.remove(peek.get(), CeActivityDto.Status.SUCCESS, null); + underTest.remove(peek.get(), CeActivityDto.Status.SUCCESS, null, null); // queue is empty assertThat(dbTester.getDbClient().ceQueueDao().selectByUuid(dbTester.getSession(), task.getUuid()).isPresent()).isFalse(); @@ -148,11 +151,27 @@ public class InternalCeQueueImplTest { assertThat(history.get().getAnalysisUuid()).isNull(); } + @Test + public void remove_throws_IAE_if_exception_is_provided_but_status_is_SUCCESS() { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("Error can be provided only when status is FAILED"); + + underTest.remove(mock(CeTask.class), CeActivityDto.Status.SUCCESS, null, new RuntimeException("Some error")); + } + + @Test + public void remove_throws_IAE_if_exception_is_provided_but_status_is_CANCELED() { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("Error can be provided only when status is FAILED"); + + underTest.remove(mock(CeTask.class), CeActivityDto.Status.CANCELED, null, new RuntimeException("Some error")); + } + @Test public void remove_does_not_set_analysisUuid_in_CeActivity_when_CeTaskResult_has_no_analysis_uuid() { CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1"); Optional peek = underTest.peek(); - underTest.remove(peek.get(), CeActivityDto.Status.SUCCESS, newTaskResult(null)); + underTest.remove(peek.get(), CeActivityDto.Status.SUCCESS, newTaskResult(null), null); // available in history Optional history = dbTester.getDbClient().ceActivityDao().selectByUuid(dbTester.getSession(), task.getUuid()); @@ -165,7 +184,7 @@ public class InternalCeQueueImplTest { CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1"); Optional peek = underTest.peek(); - underTest.remove(peek.get(), CeActivityDto.Status.SUCCESS, newTaskResult(AN_ANALYSIS_UUID)); + underTest.remove(peek.get(), CeActivityDto.Status.SUCCESS, newTaskResult(AN_ANALYSIS_UUID), null); // available in history Optional history = dbTester.getDbClient().ceActivityDao().selectByUuid(dbTester.getSession(), task.getUuid()); @@ -173,14 +192,35 @@ public class InternalCeQueueImplTest { assertThat(history.get().getAnalysisUuid()).isEqualTo("U1"); } + @Test + public void remove_saves_error_message_and_stacktrace_when_exception_is_provided() { + Throwable error = new NullPointerException("Fake NPE to test persistence to DB"); + + CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1"); + Optional peek = underTest.peek(); + underTest.remove(peek.get(), CeActivityDto.Status.FAILED, null, error); + + Optional activityDto = dbTester.getDbClient().ceActivityDao().selectByUuid(session, task.getUuid()); + assertThat(activityDto).isPresent(); + + assertThat(activityDto.get().getErrorMessage()).isEqualTo(error.getMessage()); + assertThat(activityDto.get().getErrorStacktrace()).isEqualToIgnoringWhitespace(stacktraceToString(error)); + } + + private static String stacktraceToString(Throwable error) { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + error.printStackTrace(new PrintStream(out)); + return out.toString(); + } + @Test public void fail_to_remove_if_not_in_queue() throws Exception { CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1"); - underTest.remove(task, CeActivityDto.Status.SUCCESS, null); + underTest.remove(task, CeActivityDto.Status.SUCCESS, null, null); expectedException.expect(IllegalStateException.class); - underTest.remove(task, CeActivityDto.Status.SUCCESS, null); + underTest.remove(task, CeActivityDto.Status.SUCCESS, null, null); } @Test diff --git a/server/sonar-server/src/test/java/org/sonar/server/computation/taskprocessor/CeWorkerCallableImplTest.java b/server/sonar-server/src/test/java/org/sonar/server/computation/taskprocessor/CeWorkerCallableImplTest.java index d8182ed2324..7ee049e47a9 100644 --- a/server/sonar-server/src/test/java/org/sonar/server/computation/taskprocessor/CeWorkerCallableImplTest.java +++ b/server/sonar-server/src/test/java/org/sonar/server/computation/taskprocessor/CeWorkerCallableImplTest.java @@ -73,7 +73,7 @@ public class CeWorkerCallableImplTest { assertThat(underTest.call()).isTrue(); inOrder.verify(ceLogging).initForTask(task); - inOrder.verify(queue).remove(task, CeActivityDto.Status.FAILED, null); + inOrder.verify(queue).remove(task, CeActivityDto.Status.FAILED, null, null); inOrder.verify(ceLogging).clearForTask(); } @@ -87,7 +87,7 @@ public class CeWorkerCallableImplTest { inOrder.verify(ceLogging).initForTask(task); inOrder.verify(taskProcessor).process(task); - inOrder.verify(queue).remove(task, CeActivityDto.Status.SUCCESS, null); + inOrder.verify(queue).remove(task, CeActivityDto.Status.SUCCESS, null, null); inOrder.verify(ceLogging).clearForTask(); } @@ -96,13 +96,13 @@ public class CeWorkerCallableImplTest { CeTask task = createCeTask(null); when(queue.peek()).thenReturn(Optional.of(task)); taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor); - makeTaskProcessorFail(task); + Throwable error = makeTaskProcessorFail(task); assertThat(underTest.call()).isTrue(); inOrder.verify(ceLogging).initForTask(task); inOrder.verify(taskProcessor).process(task); - inOrder.verify(queue).remove(task, CeActivityDto.Status.FAILED, null); + inOrder.verify(queue).remove(task, CeActivityDto.Status.FAILED, null, error); inOrder.verify(ceLogging).clearForTask(); } @@ -215,7 +215,9 @@ public class CeWorkerCallableImplTest { return new CeTask.Builder().setUuid("TASK_1").setType(CeTaskTypes.REPORT).setComponentUuid("PROJECT_1").setSubmitterLogin(submitterLogin).build(); } - private void makeTaskProcessorFail(CeTask task) { - doThrow(new IllegalStateException("simulate exception thrown by TaskProcessor#process")).when(taskProcessor).process(task); + private IllegalStateException makeTaskProcessorFail(CeTask task) { + IllegalStateException error = new IllegalStateException("simulate exception thrown by TaskProcessor#process"); + doThrow(error).when(taskProcessor).process(task); + return error; } } -- 2.39.5