*/
package org.sonar.ce.notification;
+import java.time.Duration;
import javax.annotation.Nullable;
import org.sonar.api.resources.Qualifiers;
import org.sonar.api.resources.Scopes;
}
@Override
- public void onEnd(CeTask ceTask, CeActivityDto.Status status, @Nullable CeTaskResult taskResult, @Nullable Throwable error) {
+ public void onEnd(CeTask ceTask, CeActivityDto.Status status, Duration duration, @Nullable CeTaskResult taskResult, @Nullable Throwable error) {
if (status == CeActivityDto.Status.SUCCESS) {
return;
}
*/
package org.sonar.ce.taskprocessor;
+import java.time.Duration;
import javax.annotation.Nullable;
import org.sonar.ce.task.CeTask;
import org.sonar.ce.task.CeTaskResult;
}
@Override
- public void onEnd(CeTask ceTask, CeActivityDto.Status status, @Nullable CeTaskResult taskResult, @Nullable Throwable error) {
+ public void onEnd(CeTask ceTask, CeActivityDto.Status status, Duration duration, @Nullable CeTaskResult taskResult, @Nullable Throwable error) {
ceTaskLogging.clearForTask();
}
}
*/
package org.sonar.ce.taskprocessor;
+import java.time.Duration;
import javax.annotation.Nullable;
import org.sonar.ce.task.CeTask;
import org.sonar.ce.task.CeTaskInterrupter;
}
@Override
- public void onEnd(CeTask ceTask, CeActivityDto.Status status, @Nullable CeTaskResult taskResult, @Nullable Throwable error) {
+ public void onEnd(CeTask ceTask, CeActivityDto.Status status, Duration duration, @Nullable CeTaskResult taskResult, @Nullable Throwable error) {
interrupter.onEnd(ceTask);
}
}
*/
package org.sonar.ce.taskprocessor;
+import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.Callable;
import javax.annotation.Nullable;
+import org.sonar.api.ce.ComputeEngineSide;
import org.sonar.ce.queue.CeQueue;
import org.sonar.ce.task.CeTask;
import org.sonar.ce.task.CeTaskResult;
* All classes implementing this interface are guaranted to be called for each event, even if another implementation
* failed when called.
*/
+ @ComputeEngineSide
interface ExecutionListener {
/**
* Called when starting executing a {@link CeTask} (which means: after it's been picked for processing, but before
/**
* Called when the processing of the task is finished (which means: after it's been moved to history).
*/
- void onEnd(CeTask ceTask, CeActivityDto.Status status, @Nullable CeTaskResult taskResult, @Nullable Throwable error);
+ void onEnd(CeTask ceTask, CeActivityDto.Status status, Duration duration, @Nullable CeTaskResult taskResult, @Nullable Throwable error);
}
}
*/
package org.sonar.ce.taskprocessor;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
private final AtomicReference<RunningState> runningState = new AtomicReference<>();
public CeWorkerImpl(int ordinal, String uuid,
- InternalCeQueue queue, CeTaskProcessorRepository taskProcessorRepository,
- CeWorkerController ceWorkerController,
- ExecutionListener... listeners) {
+ InternalCeQueue queue, CeTaskProcessorRepository taskProcessorRepository,
+ CeWorkerController ceWorkerController,
+ ExecutionListener... listeners) {
this.ordinal = checkOrdinal(ordinal);
this.uuid = uuid;
this.queue = queue;
localRunningState = new RunningState(currentThread);
if (!runningState.compareAndSet(null, localRunningState)) {
LOG.warn("Worker {} (UUID=%s) starts executing with new Thread {} while running state isn't null. " +
- "Forcefully updating Workers's running state to new Thread.",
+ "Forcefully updating Workers's running state to new Thread.",
getOrdinal(), getUUID(), currentThread);
runningState.set(localRunningState);
}
localRunningState.runningThread.setName(oldName);
if (!runningState.compareAndSet(localRunningState, null)) {
LOG.warn("Worker {} (UUID=%s) ending execution in Thread {} while running state has already changed." +
- " Keeping this new state.",
+ " Keeping this new state.",
getOrdinal(), getUUID(), localRunningState.runningThread);
}
}
}
try (CeWorkerController.ProcessingRecorderHook processing = ceWorkerController.registerProcessingFor(this);
- ExecuteTask executeTask = new ExecuteTask(localRunningState, ceTask.get())) {
+ ExecuteTask executeTask = new ExecuteTask(localRunningState, ceTask.get())) {
executeTask.run();
} catch (Exception e) {
LOG.error(format("An error occurred while executing task with uuid '%s'", ceTask.get().getUuid()), e);
}
private void finalizeTask(CeTask task, Profiler ceProfiler, CeActivityDto.Status status,
- @Nullable CeTaskResult taskResult, @Nullable Throwable error) {
+ @Nullable CeTaskResult taskResult, @Nullable Throwable error) {
try {
queue.remove(task, status, taskResult, error);
} catch (Exception e) {
}
LOG.error(format("Failed to finalize task with uuid '%s' and persist its state to db", task.getUuid()), e);
} finally {
- // finalize
- stopLogProfiler(ceProfiler, status);
- callListeners(t -> t.onEnd(task, status, taskResult, error));
+ ceProfiler.addContext("status", status.name());
+ long durationMs = ceProfiler.stopInfo("Executed task");
+ Duration duration = Duration.of(durationMs, ChronoUnit.MILLIS);
+ callListeners(t -> t.onEnd(task, status, duration, taskResult, error));
}
}
}
}
- private static void stopLogProfiler(Profiler profiler, CeActivityDto.Status status) {
- profiler.addContext("status", status.name());
- profiler.stopInfo("Executed task");
- }
-
private static final class RunningState {
private final Thread runningThread;
private CeTask task;
*/
package org.sonar.ce.notification;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.Optional;
import java.util.Random;
@Test
public void onEnd_has_no_effect_if_status_is_SUCCESS() {
- fullMockedUnderTest.onEnd(ceTaskMock, CeActivityDto.Status.SUCCESS, ceTaskResultMock, throwableMock);
+ fullMockedUnderTest.onEnd(ceTaskMock, CeActivityDto.Status.SUCCESS, randomDuration(), ceTaskResultMock, throwableMock);
verifyZeroInteractions(ceTaskMock, ceTaskResultMock, throwableMock, notificationService, dbClientMock, serializer, system2);
}
public void onEnd_has_no_effect_if_CeTask_type_is_not_report() {
when(ceTaskMock.getType()).thenReturn(randomAlphanumeric(12));
- fullMockedUnderTest.onEnd(ceTaskMock, CeActivityDto.Status.FAILED, ceTaskResultMock, throwableMock);
+ fullMockedUnderTest.onEnd(ceTaskMock, CeActivityDto.Status.FAILED, randomDuration(), ceTaskResultMock, throwableMock);
verifyZeroInteractions(ceTaskResultMock, throwableMock, notificationService, dbClientMock, serializer, system2);
}
public void onEnd_has_no_effect_if_CeTask_has_no_component_uuid() {
when(ceTaskMock.getType()).thenReturn(CeTaskTypes.REPORT);
- fullMockedUnderTest.onEnd(ceTaskMock, CeActivityDto.Status.FAILED, ceTaskResultMock, throwableMock);
+ fullMockedUnderTest.onEnd(ceTaskMock, CeActivityDto.Status.FAILED, randomDuration(), ceTaskResultMock, throwableMock);
verifyZeroInteractions(ceTaskResultMock, throwableMock, notificationService, dbClientMock, serializer, system2);
}
when(notificationService.hasProjectSubscribersForTypes(componentUuid, singleton(ReportAnalysisFailureNotification.class)))
.thenReturn(false);
- fullMockedUnderTest.onEnd(ceTaskMock, CeActivityDto.Status.FAILED, ceTaskResultMock, throwableMock);
+ fullMockedUnderTest.onEnd(ceTaskMock, CeActivityDto.Status.FAILED, randomDuration(), ceTaskResultMock, throwableMock);
verifyZeroInteractions(ceTaskResultMock, throwableMock, dbClientMock, serializer, system2);
}
expectedException.expect(RowNotFoundException.class);
expectedException.expectMessage("Component with uuid '" + componentUuid + "' not found");
- underTest.onEnd(ceTaskMock, CeActivityDto.Status.FAILED, ceTaskResultMock, throwableMock);
+ underTest.onEnd(ceTaskMock, CeActivityDto.Status.FAILED, randomDuration(), ceTaskResultMock, throwableMock);
}
@Test
when(notificationService.hasProjectSubscribersForTypes(component.uuid(), singleton(ReportAnalysisFailureNotification.class)))
.thenReturn(true);
- underTest.onEnd(ceTaskMock, CeActivityDto.Status.FAILED, ceTaskResultMock, throwableMock);
+ underTest.onEnd(ceTaskMock, CeActivityDto.Status.FAILED, randomDuration(), ceTaskResultMock, throwableMock);
fail("An IllegalArgumentException should have been thrown for component " + component);
} catch (IllegalArgumentException e) {
expectedException.expect(RowNotFoundException.class);
expectedException.expectMessage("CeActivity with uuid '" + taskUuid + "' not found");
- underTest.onEnd(ceTaskMock, CeActivityDto.Status.FAILED, ceTaskResultMock, throwableMock);
+ underTest.onEnd(ceTaskMock, CeActivityDto.Status.FAILED, randomDuration(), ceTaskResultMock, throwableMock);
}
@Test
ComponentDto project = initMocksToPassConditions(taskUuid, createdAt, executedAt);
Notification notificationMock = mockSerializer();
- underTest.onEnd(ceTaskMock, CeActivityDto.Status.FAILED, ceTaskResultMock, throwableMock);
+ underTest.onEnd(ceTaskMock, CeActivityDto.Status.FAILED, randomDuration(), ceTaskResultMock, throwableMock);
ArgumentCaptor<ReportAnalysisFailureNotificationBuilder> notificationCaptor = verifyAndCaptureSerializedNotification();
verify(notificationService).deliver(same(notificationMock));
String message = randomAlphanumeric(66);
when(throwableMock.getMessage()).thenReturn(message);
- underTest.onEnd(ceTaskMock, CeActivityDto.Status.FAILED, ceTaskResultMock, throwableMock);
+ underTest.onEnd(ceTaskMock, CeActivityDto.Status.FAILED, randomDuration(), ceTaskResultMock, throwableMock);
ArgumentCaptor<ReportAnalysisFailureNotificationBuilder> notificationCaptor = verifyAndCaptureSerializedNotification();
initMocksToPassConditions(taskUuid, random.nextInt(999_999), (long) random.nextInt(999_999));
Notification notificationMock = mockSerializer();
- underTest.onEnd(ceTaskMock, CeActivityDto.Status.FAILED, ceTaskResultMock, null);
+ underTest.onEnd(ceTaskMock, CeActivityDto.Status.FAILED, randomDuration(), ceTaskResultMock, null);
verify(notificationService).deliver(same(notificationMock));
ArgumentCaptor<ReportAnalysisFailureNotificationBuilder> notificationCaptor = verifyAndCaptureSerializedNotification();
initMocksToPassConditions(taskUuid, random.nextInt(999_999), (long) random.nextInt(999_999));
Notification notificationMock = mockSerializer();
- underTest.onEnd(ceTaskMock, CeActivityDto.Status.FAILED, null, null);
+ underTest.onEnd(ceTaskMock, CeActivityDto.Status.FAILED, randomDuration(), null, null);
verify(notificationService).deliver(same(notificationMock));
}
initMocksToPassConditions(taskUuid, random.nextInt(999_999), (long) random.nextInt(999_999));
Notification notificationMock = mockSerializer();
- underTest.onEnd(ceTaskMock, CeActivityDto.Status.FAILED, ceTaskResultMock, null);
+ underTest.onEnd(ceTaskMock, CeActivityDto.Status.FAILED, randomDuration(), ceTaskResultMock, null);
verify(notificationService).deliver(same(notificationMock));
verifyZeroInteractions(ceTaskResultMock);
when(system2.now()).thenReturn(now);
Notification notificationMock = mockSerializer();
- underTest.onEnd(ceTaskMock, CeActivityDto.Status.FAILED, ceTaskResultMock, null);
+ underTest.onEnd(ceTaskMock, CeActivityDto.Status.FAILED, randomDuration(), ceTaskResultMock, null);
verify(notificationService).deliver(same(notificationMock));
ArgumentCaptor<ReportAnalysisFailureNotificationBuilder> notificationCaptor = verifyAndCaptureSerializedNotification();
.setTaskType(CeTaskTypes.REPORT)
.setComponentUuid(project.uuid())
.setCreatedAt(createdAt))
- .setExecutedAt(executedAt)
- .setStatus(CeActivityDto.Status.FAILED));
+ .setExecutedAt(executedAt)
+ .setStatus(CeActivityDto.Status.FAILED));
dbTester.getSession().commit();
}
verify(serializer).toNotification(notificationCaptor.capture());
return notificationCaptor;
}
+
+ private Duration randomDuration() {
+ return Duration.of(random.nextLong(), ChronoUnit.MILLIS);
+ }
}
*/
package org.sonar.ce.taskprocessor;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
import java.util.Random;
import org.junit.Test;
import org.sonar.ce.task.CeTask;
import org.sonar.ce.task.CeTaskInterrupter;
import org.sonar.db.ce.CeActivityDto;
-import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
CeTask ceTask = mock(CeTask.class);
CeActivityDto.Status randomStatus = CeActivityDto.Status.values()[new Random().nextInt(CeActivityDto.Status.values().length)];
- underTest.onEnd(ceTask, randomStatus, null, null);
+ underTest.onEnd(ceTask, randomStatus, Duration.of(1, ChronoUnit.SECONDS), null, null);
verify(ceTaskInterrupter).onEnd(same(ceTask));
}
*/
package org.sonar.ce.taskprocessor;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
import java.util.Random;
import org.junit.Test;
import org.mockito.Mockito;
public void onEnd_calls_clearForTask() {
underTest.onEnd(mock(CeTask.class),
CeActivityDto.Status.values()[new Random().nextInt(CeActivityDto.Status.values().length)],
- null, null);
+ Duration.of(1, ChronoUnit.SECONDS), null, null);
verify(ceTaskLogging).clearForTask();
verifyNoMoreInteractions(ceTaskLogging);
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
inOrder.verify(executionListener1).onStart(task);
inOrder.verify(executionListener2).onStart(task);
inOrder.verify(queue).remove(task, CeActivityDto.Status.FAILED, null, null);
- inOrder.verify(executionListener1).onEnd(task, CeActivityDto.Status.FAILED, null, null);
- inOrder.verify(executionListener2).onEnd(task, CeActivityDto.Status.FAILED, null, null);
+ inOrder.verify(executionListener1).onEnd(eq(task), eq(CeActivityDto.Status.FAILED), any(), isNull(), isNull());
+ inOrder.verify(executionListener2).onEnd(eq(task), eq(CeActivityDto.Status.FAILED), any(), isNull(), isNull());
}
@Test
inOrder.verify(executionListener2).onStart(task);
inOrder.verify(taskProcessor).process(task);
inOrder.verify(queue).remove(task, CeActivityDto.Status.SUCCESS, null, null);
- inOrder.verify(executionListener1).onEnd(task, CeActivityDto.Status.SUCCESS, null, null);
- inOrder.verify(executionListener2).onEnd(task, CeActivityDto.Status.SUCCESS, null, null);
+ inOrder.verify(executionListener1).onEnd(eq(task), eq(CeActivityDto.Status.SUCCESS), any(), isNull(), isNull());
+ inOrder.verify(executionListener2).onEnd(eq(task), eq(CeActivityDto.Status.SUCCESS), any(), isNull(), isNull());
}
@Test
inOrder.verify(executionListener2).onStart(task);
inOrder.verify(taskProcessor).process(task);
inOrder.verify(queue).remove(task, CeActivityDto.Status.FAILED, null, error);
- inOrder.verify(executionListener1).onEnd(task, CeActivityDto.Status.FAILED, null, error);
- inOrder.verify(executionListener2).onEnd(task, CeActivityDto.Status.FAILED, null, error);
+ inOrder.verify(executionListener1).onEnd(eq(task), eq(CeActivityDto.Status.FAILED), any(), isNull(), eq(error));
+ inOrder.verify(executionListener2).onEnd(eq(task), eq(CeActivityDto.Status.FAILED), any(), isNull(), eq(error));
}
@Test