import org.sonar.ce.queue.CeTask;
import org.sonar.ce.queue.CeTaskResult;
import org.sonar.db.DbSession;
-import org.sonar.db.ce.CeActivityDto;
+import org.sonar.db.ce.CeActivityDto.Status;
import org.sonar.db.ce.CeQueueDto;
/**
/**
* Removes all the tasks from the queue, whatever their status. They are marked
- * as {@link CeActivityDto.Status#CANCELED} in past activity.
+ * as {@link Status#CANCELED} in past activity.
* This method can NOT be called when workers are being executed, as in progress
* tasks can't be killed.
*
* is called by Compute Engine workers when task is processed and can include an option {@link CeTaskResult} object.
*
* @throws IllegalStateException if the task does not exist in the queue
+ * @throws IllegalArgumentException if {@code error} is non {@code null} but {@code status} is not {@link Status#FAILED}
*/
- void remove(CeTask task, CeActivityDto.Status status, @Nullable CeTaskResult taskResult);
+ void remove(CeTask task, Status status, @Nullable CeTaskResult taskResult, @Nullable Throwable error);
void cancel(DbSession dbSession, CeQueueDto ceQueueDto);
package org.sonar.server.computation.queue;
import com.google.common.base.Optional;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintWriter;
import java.util.concurrent.atomic.AtomicBoolean;
+import javax.annotation.CheckForNull;
import javax.annotation.Nullable;
+import org.apache.log4j.Logger;
import org.sonar.api.ce.ComputeEngineSide;
import org.sonar.api.utils.System2;
import org.sonar.ce.monitoring.CEQueueStatus;
import org.sonar.db.ce.CeActivityDto;
import org.sonar.db.ce.CeQueueDto;
-import static java.lang.String.format;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
@ComputeEngineSide
public class InternalCeQueueImpl extends CeQueueImpl implements InternalCeQueue {
// state
private AtomicBoolean peekPaused = new AtomicBoolean(false);
- public InternalCeQueueImpl(System2 system2, DbClient dbClient, UuidFactory uuidFactory,
- CEQueueStatus queueStatus) {
+ public InternalCeQueueImpl(System2 system2, DbClient dbClient, UuidFactory uuidFactory, CEQueueStatus queueStatus) {
super(dbClient, uuidFactory);
this.system2 = system2;
this.dbClient = dbClient;
}
@Override
- public void remove(CeTask task, CeActivityDto.Status status, CeTaskResult taskResult) {
+ public void remove(CeTask task, CeActivityDto.Status status, @Nullable CeTaskResult taskResult, @Nullable Throwable error) {
+ checkArgument(error == null || status == CeActivityDto.Status.FAILED, "Error can be provided only when status is FAILED");
DbSession dbSession = dbClient.openSession(false);
try {
Optional<CeQueueDto> queueDto = dbClient.ceQueueDao().selectByUuid(dbSession, task.getUuid());
- if (!queueDto.isPresent()) {
- throw new IllegalStateException(format("Task does not exist anymore: %s", task));
- }
+ checkState(queueDto.isPresent(), "Task does not exist anymore: %s", task);
CeActivityDto activityDto = new CeActivityDto(queueDto.get());
activityDto.setStatus(status);
updateQueueStatus(status, activityDto);
updateTaskResult(activityDto, taskResult);
+ updateError(activityDto, error);
remove(dbSession, queueDto.get(), activityDto);
-
} finally {
dbClient.closeSession(dbSession);
}
}
}
+ private static void updateError(CeActivityDto activityDto, @Nullable Throwable error) {
+ if (error == null) {
+ return;
+ }
+
+ activityDto.setErrorMessage(error.getMessage());
+ String stacktrace = getStackTraceForPersistence(error);
+ if (stacktrace != null) {
+ activityDto.setErrorStacktrace(stacktrace);
+ }
+ }
+
+ @CheckForNull
+ private static String getStackTraceForPersistence(Throwable error) {
+ try (ByteArrayOutputStream out = new ByteArrayOutputStream();
+ LineReturnEnforcedPrintStream printStream = new LineReturnEnforcedPrintStream(out);) {
+ error.printStackTrace(printStream);
+ printStream.flush();
+ return out.toString();
+ } catch (IOException e) {
+ Logger.getLogger(InternalCeQueueImpl.class).debug("Failed to getStacktrace out of error", e);
+ return null;
+ }
+ }
+
private void updateQueueStatus(CeActivityDto.Status status, CeActivityDto activityDto) {
Long startedAt = activityDto.getStartedAt();
if (startedAt == null) {
return peekPaused.get();
}
+ /**
+ * A {@link PrintWriter} subclass which enforces that line returns are {@code \n} whichever the platform.
+ */
+ private static class LineReturnEnforcedPrintStream extends PrintWriter {
+
+ LineReturnEnforcedPrintStream(OutputStream out) {
+ super(out);
+ }
+
+ @Override
+ public void println() {
+ super.print('\n');
+ }
+
+ @Override
+ public void println(boolean x) {
+ super.print(x);
+ println();
+ }
+
+ @Override
+ public void println(char x) {
+ super.print(x);
+ println();
+ }
+
+ @Override
+ public void println(int x) {
+ super.print(x);
+ println();
+ }
+
+ @Override
+ public void println(long x) {
+ super.print(x);
+ println();
+ }
+
+ @Override
+ public void println(float x) {
+ super.print(x);
+ println();
+ }
+
+ @Override
+ public void println(double x) {
+ super.print(x);
+ println();
+ }
+
+ @Override
+ public void println(char[] x) {
+ super.print(x);
+ println();
+ }
+
+ @Override
+ public void println(String x) {
+ super.print(x);
+ println();
+ }
+
+ @Override
+ public void println(Object x) {
+ super.print(x);
+ println();
+ }
+ }
+
}
Profiler ceProfiler = startActivityProfiler(task);
CeActivityDto.Status status = CeActivityDto.Status.FAILED;
- CeTaskResult process = null;
+ CeTaskResult taskResult = null;
+ Throwable error = null;
try {
// TODO delegate the message to the related task processor, according to task type
Optional<CeTaskProcessor> taskProcessor = taskProcessorRepository.getForCeTask(task);
if (taskProcessor.isPresent()) {
- process = taskProcessor.get().process(task);
+ taskResult = taskProcessor.get().process(task);
status = CeActivityDto.Status.SUCCESS;
} else {
LOG.error("No CeTaskProcessor is defined for task of type {}. Plugin configuration may have changed", task.getType());
}
} catch (Throwable e) {
LOG.error(format("Failed to execute task %s", task.getUuid()), e);
+ error = e;
} finally {
- queue.remove(task, status, process);
+ queue.remove(task, status, taskResult, error);
stopActivityProfiler(ceProfiler, task, status);
ceLogging.clearForTask();
}
package org.sonar.server.computation.queue;
import com.google.common.base.Optional;
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
import java.util.List;
import javax.annotation.Nullable;
import org.junit.Rule;
import static java.util.Arrays.asList;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.guava.api.Assertions.assertThat;
import static org.hamcrest.Matchers.startsWith;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public void test_remove() {
CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1");
Optional<CeTask> peek = underTest.peek();
- underTest.remove(peek.get(), CeActivityDto.Status.SUCCESS, null);
+ underTest.remove(peek.get(), CeActivityDto.Status.SUCCESS, null, null);
// queue is empty
assertThat(dbTester.getDbClient().ceQueueDao().selectByUuid(dbTester.getSession(), task.getUuid()).isPresent()).isFalse();
assertThat(history.get().getAnalysisUuid()).isNull();
}
+ @Test
+ public void remove_throws_IAE_if_exception_is_provided_but_status_is_SUCCESS() {
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage("Error can be provided only when status is FAILED");
+
+ underTest.remove(mock(CeTask.class), CeActivityDto.Status.SUCCESS, null, new RuntimeException("Some error"));
+ }
+
+ @Test
+ public void remove_throws_IAE_if_exception_is_provided_but_status_is_CANCELED() {
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage("Error can be provided only when status is FAILED");
+
+ underTest.remove(mock(CeTask.class), CeActivityDto.Status.CANCELED, null, new RuntimeException("Some error"));
+ }
+
@Test
public void remove_does_not_set_analysisUuid_in_CeActivity_when_CeTaskResult_has_no_analysis_uuid() {
CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1");
Optional<CeTask> peek = underTest.peek();
- underTest.remove(peek.get(), CeActivityDto.Status.SUCCESS, newTaskResult(null));
+ underTest.remove(peek.get(), CeActivityDto.Status.SUCCESS, newTaskResult(null), null);
// available in history
Optional<CeActivityDto> history = dbTester.getDbClient().ceActivityDao().selectByUuid(dbTester.getSession(), task.getUuid());
CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1");
Optional<CeTask> peek = underTest.peek();
- underTest.remove(peek.get(), CeActivityDto.Status.SUCCESS, newTaskResult(AN_ANALYSIS_UUID));
+ underTest.remove(peek.get(), CeActivityDto.Status.SUCCESS, newTaskResult(AN_ANALYSIS_UUID), null);
// available in history
Optional<CeActivityDto> history = dbTester.getDbClient().ceActivityDao().selectByUuid(dbTester.getSession(), task.getUuid());
assertThat(history.get().getAnalysisUuid()).isEqualTo("U1");
}
+ @Test
+ public void remove_saves_error_message_and_stacktrace_when_exception_is_provided() {
+ Throwable error = new NullPointerException("Fake NPE to test persistence to DB");
+
+ CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1");
+ Optional<CeTask> peek = underTest.peek();
+ underTest.remove(peek.get(), CeActivityDto.Status.FAILED, null, error);
+
+ Optional<CeActivityDto> activityDto = dbTester.getDbClient().ceActivityDao().selectByUuid(session, task.getUuid());
+ assertThat(activityDto).isPresent();
+
+ assertThat(activityDto.get().getErrorMessage()).isEqualTo(error.getMessage());
+ assertThat(activityDto.get().getErrorStacktrace()).isEqualToIgnoringWhitespace(stacktraceToString(error));
+ }
+
+ private static String stacktraceToString(Throwable error) {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ error.printStackTrace(new PrintStream(out));
+ return out.toString();
+ }
+
@Test
public void fail_to_remove_if_not_in_queue() throws Exception {
CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1");
- underTest.remove(task, CeActivityDto.Status.SUCCESS, null);
+ underTest.remove(task, CeActivityDto.Status.SUCCESS, null, null);
expectedException.expect(IllegalStateException.class);
- underTest.remove(task, CeActivityDto.Status.SUCCESS, null);
+ underTest.remove(task, CeActivityDto.Status.SUCCESS, null, null);
}
@Test
assertThat(underTest.call()).isTrue();
inOrder.verify(ceLogging).initForTask(task);
- inOrder.verify(queue).remove(task, CeActivityDto.Status.FAILED, null);
+ inOrder.verify(queue).remove(task, CeActivityDto.Status.FAILED, null, null);
inOrder.verify(ceLogging).clearForTask();
}
inOrder.verify(ceLogging).initForTask(task);
inOrder.verify(taskProcessor).process(task);
- inOrder.verify(queue).remove(task, CeActivityDto.Status.SUCCESS, null);
+ inOrder.verify(queue).remove(task, CeActivityDto.Status.SUCCESS, null, null);
inOrder.verify(ceLogging).clearForTask();
}
CeTask task = createCeTask(null);
when(queue.peek()).thenReturn(Optional.of(task));
taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor);
- makeTaskProcessorFail(task);
+ Throwable error = makeTaskProcessorFail(task);
assertThat(underTest.call()).isTrue();
inOrder.verify(ceLogging).initForTask(task);
inOrder.verify(taskProcessor).process(task);
- inOrder.verify(queue).remove(task, CeActivityDto.Status.FAILED, null);
+ inOrder.verify(queue).remove(task, CeActivityDto.Status.FAILED, null, error);
inOrder.verify(ceLogging).clearForTask();
}
return new CeTask.Builder().setUuid("TASK_1").setType(CeTaskTypes.REPORT).setComponentUuid("PROJECT_1").setSubmitterLogin(submitterLogin).build();
}
- private void makeTaskProcessorFail(CeTask task) {
- doThrow(new IllegalStateException("simulate exception thrown by TaskProcessor#process")).when(taskProcessor).process(task);
+ private IllegalStateException makeTaskProcessorFail(CeTask task) {
+ IllegalStateException error = new IllegalStateException("simulate exception thrown by TaskProcessor#process");
+ doThrow(error).when(taskProcessor).process(task);
+ return error;
}
}