@@ -19,6 +19,7 @@ | |||
*/ | |||
package org.sonar.ce.notification; | |||
import java.time.Duration; | |||
import javax.annotation.Nullable; | |||
import org.sonar.api.resources.Qualifiers; | |||
import org.sonar.api.resources.Scopes; | |||
@@ -61,7 +62,7 @@ public class ReportAnalysisFailureNotificationExecutionListener implements CeWor | |||
} | |||
@Override | |||
public void onEnd(CeTask ceTask, CeActivityDto.Status status, @Nullable CeTaskResult taskResult, @Nullable Throwable error) { | |||
public void onEnd(CeTask ceTask, CeActivityDto.Status status, Duration duration, @Nullable CeTaskResult taskResult, @Nullable Throwable error) { | |||
if (status == CeActivityDto.Status.SUCCESS) { | |||
return; | |||
} |
@@ -19,6 +19,7 @@ | |||
*/ | |||
package org.sonar.ce.taskprocessor; | |||
import java.time.Duration; | |||
import javax.annotation.Nullable; | |||
import org.sonar.ce.task.CeTask; | |||
import org.sonar.ce.task.CeTaskResult; | |||
@@ -42,7 +43,7 @@ public class CeLoggingWorkerExecutionListener implements CeWorker.ExecutionListe | |||
} | |||
@Override | |||
public void onEnd(CeTask ceTask, CeActivityDto.Status status, @Nullable CeTaskResult taskResult, @Nullable Throwable error) { | |||
public void onEnd(CeTask ceTask, CeActivityDto.Status status, Duration duration, @Nullable CeTaskResult taskResult, @Nullable Throwable error) { | |||
ceTaskLogging.clearForTask(); | |||
} | |||
} |
@@ -19,6 +19,7 @@ | |||
*/ | |||
package org.sonar.ce.taskprocessor; | |||
import java.time.Duration; | |||
import javax.annotation.Nullable; | |||
import org.sonar.ce.task.CeTask; | |||
import org.sonar.ce.task.CeTaskInterrupter; | |||
@@ -38,7 +39,7 @@ public class CeTaskInterrupterWorkerExecutionListener implements CeWorker.Execut | |||
} | |||
@Override | |||
public void onEnd(CeTask ceTask, CeActivityDto.Status status, @Nullable CeTaskResult taskResult, @Nullable Throwable error) { | |||
public void onEnd(CeTask ceTask, CeActivityDto.Status status, Duration duration, @Nullable CeTaskResult taskResult, @Nullable Throwable error) { | |||
interrupter.onEnd(ceTask); | |||
} | |||
} |
@@ -19,9 +19,11 @@ | |||
*/ | |||
package org.sonar.ce.taskprocessor; | |||
import java.time.Duration; | |||
import java.util.Optional; | |||
import java.util.concurrent.Callable; | |||
import javax.annotation.Nullable; | |||
import org.sonar.api.ce.ComputeEngineSide; | |||
import org.sonar.ce.queue.CeQueue; | |||
import org.sonar.ce.task.CeTask; | |||
import org.sonar.ce.task.CeTaskResult; | |||
@@ -68,6 +70,7 @@ public interface CeWorker extends Callable<CeWorker.Result> { | |||
* All classes implementing this interface are guaranted to be called for each event, even if another implementation | |||
* failed when called. | |||
*/ | |||
@ComputeEngineSide | |||
interface ExecutionListener { | |||
/** | |||
* Called when starting executing a {@link CeTask} (which means: after it's been picked for processing, but before | |||
@@ -78,6 +81,6 @@ public interface CeWorker extends Callable<CeWorker.Result> { | |||
/** | |||
* Called when the processing of the task is finished (which means: after it's been moved to history). | |||
*/ | |||
void onEnd(CeTask ceTask, CeActivityDto.Status status, @Nullable CeTaskResult taskResult, @Nullable Throwable error); | |||
void onEnd(CeTask ceTask, CeActivityDto.Status status, Duration duration, @Nullable CeTaskResult taskResult, @Nullable Throwable error); | |||
} | |||
} |
@@ -19,6 +19,8 @@ | |||
*/ | |||
package org.sonar.ce.taskprocessor; | |||
import java.time.Duration; | |||
import java.time.temporal.ChronoUnit; | |||
import java.util.Arrays; | |||
import java.util.List; | |||
import java.util.Map; | |||
@@ -61,9 +63,9 @@ public class CeWorkerImpl implements CeWorker { | |||
private final AtomicReference<RunningState> runningState = new AtomicReference<>(); | |||
public CeWorkerImpl(int ordinal, String uuid, | |||
InternalCeQueue queue, CeTaskProcessorRepository taskProcessorRepository, | |||
CeWorkerController ceWorkerController, | |||
ExecutionListener... listeners) { | |||
InternalCeQueue queue, CeTaskProcessorRepository taskProcessorRepository, | |||
CeWorkerController ceWorkerController, | |||
ExecutionListener... listeners) { | |||
this.ordinal = checkOrdinal(ordinal); | |||
this.uuid = uuid; | |||
this.queue = queue; | |||
@@ -117,7 +119,7 @@ public class CeWorkerImpl implements CeWorker { | |||
localRunningState = new RunningState(currentThread); | |||
if (!runningState.compareAndSet(null, localRunningState)) { | |||
LOG.warn("Worker {} (UUID=%s) starts executing with new Thread {} while running state isn't null. " + | |||
"Forcefully updating Workers's running state to new Thread.", | |||
"Forcefully updating Workers's running state to new Thread.", | |||
getOrdinal(), getUUID(), currentThread); | |||
runningState.set(localRunningState); | |||
} | |||
@@ -136,7 +138,7 @@ public class CeWorkerImpl implements CeWorker { | |||
localRunningState.runningThread.setName(oldName); | |||
if (!runningState.compareAndSet(localRunningState, null)) { | |||
LOG.warn("Worker {} (UUID=%s) ending execution in Thread {} while running state has already changed." + | |||
" Keeping this new state.", | |||
" Keeping this new state.", | |||
getOrdinal(), getUUID(), localRunningState.runningThread); | |||
} | |||
} | |||
@@ -152,7 +154,7 @@ public class CeWorkerImpl implements CeWorker { | |||
} | |||
try (CeWorkerController.ProcessingRecorderHook processing = ceWorkerController.registerProcessingFor(this); | |||
ExecuteTask executeTask = new ExecuteTask(localRunningState, ceTask.get())) { | |||
ExecuteTask executeTask = new ExecuteTask(localRunningState, ceTask.get())) { | |||
executeTask.run(); | |||
} catch (Exception e) { | |||
LOG.error(format("An error occurred while executing task with uuid '%s'", ceTask.get().getUuid()), e); | |||
@@ -235,7 +237,7 @@ public class CeWorkerImpl implements CeWorker { | |||
} | |||
private void finalizeTask(CeTask task, Profiler ceProfiler, CeActivityDto.Status status, | |||
@Nullable CeTaskResult taskResult, @Nullable Throwable error) { | |||
@Nullable CeTaskResult taskResult, @Nullable Throwable error) { | |||
try { | |||
queue.remove(task, status, taskResult, error); | |||
} catch (Exception e) { | |||
@@ -244,9 +246,10 @@ public class CeWorkerImpl implements CeWorker { | |||
} | |||
LOG.error(format("Failed to finalize task with uuid '%s' and persist its state to db", task.getUuid()), e); | |||
} finally { | |||
// finalize | |||
stopLogProfiler(ceProfiler, status); | |||
callListeners(t -> t.onEnd(task, status, taskResult, error)); | |||
ceProfiler.addContext("status", status.name()); | |||
long durationMs = ceProfiler.stopInfo("Executed task"); | |||
Duration duration = Duration.of(durationMs, ChronoUnit.MILLIS); | |||
callListeners(t -> t.onEnd(task, status, duration, taskResult, error)); | |||
} | |||
} | |||
@@ -289,11 +292,6 @@ public class CeWorkerImpl implements CeWorker { | |||
} | |||
} | |||
private static void stopLogProfiler(Profiler profiler, CeActivityDto.Status status) { | |||
profiler.addContext("status", status.name()); | |||
profiler.stopInfo("Executed task"); | |||
} | |||
private static final class RunningState { | |||
private final Thread runningThread; | |||
private CeTask task; |
@@ -19,6 +19,8 @@ | |||
*/ | |||
package org.sonar.ce.notification; | |||
import java.time.Duration; | |||
import java.time.temporal.ChronoUnit; | |||
import java.util.Arrays; | |||
import java.util.Optional; | |||
import java.util.Random; | |||
@@ -96,7 +98,7 @@ public class ReportAnalysisFailureNotificationExecutionListenerTest { | |||
@Test | |||
public void onEnd_has_no_effect_if_status_is_SUCCESS() { | |||
fullMockedUnderTest.onEnd(ceTaskMock, CeActivityDto.Status.SUCCESS, ceTaskResultMock, throwableMock); | |||
fullMockedUnderTest.onEnd(ceTaskMock, CeActivityDto.Status.SUCCESS, randomDuration(), ceTaskResultMock, throwableMock); | |||
verifyZeroInteractions(ceTaskMock, ceTaskResultMock, throwableMock, notificationService, dbClientMock, serializer, system2); | |||
} | |||
@@ -105,7 +107,7 @@ public class ReportAnalysisFailureNotificationExecutionListenerTest { | |||
public void onEnd_has_no_effect_if_CeTask_type_is_not_report() { | |||
when(ceTaskMock.getType()).thenReturn(randomAlphanumeric(12)); | |||
fullMockedUnderTest.onEnd(ceTaskMock, CeActivityDto.Status.FAILED, ceTaskResultMock, throwableMock); | |||
fullMockedUnderTest.onEnd(ceTaskMock, CeActivityDto.Status.FAILED, randomDuration(), ceTaskResultMock, throwableMock); | |||
verifyZeroInteractions(ceTaskResultMock, throwableMock, notificationService, dbClientMock, serializer, system2); | |||
} | |||
@@ -114,7 +116,7 @@ public class ReportAnalysisFailureNotificationExecutionListenerTest { | |||
public void onEnd_has_no_effect_if_CeTask_has_no_component_uuid() { | |||
when(ceTaskMock.getType()).thenReturn(CeTaskTypes.REPORT); | |||
fullMockedUnderTest.onEnd(ceTaskMock, CeActivityDto.Status.FAILED, ceTaskResultMock, throwableMock); | |||
fullMockedUnderTest.onEnd(ceTaskMock, CeActivityDto.Status.FAILED, randomDuration(), ceTaskResultMock, throwableMock); | |||
verifyZeroInteractions(ceTaskResultMock, throwableMock, notificationService, dbClientMock, serializer, system2); | |||
} | |||
@@ -127,7 +129,7 @@ public class ReportAnalysisFailureNotificationExecutionListenerTest { | |||
when(notificationService.hasProjectSubscribersForTypes(componentUuid, singleton(ReportAnalysisFailureNotification.class))) | |||
.thenReturn(false); | |||
fullMockedUnderTest.onEnd(ceTaskMock, CeActivityDto.Status.FAILED, ceTaskResultMock, throwableMock); | |||
fullMockedUnderTest.onEnd(ceTaskMock, CeActivityDto.Status.FAILED, randomDuration(), ceTaskResultMock, throwableMock); | |||
verifyZeroInteractions(ceTaskResultMock, throwableMock, dbClientMock, serializer, system2); | |||
} | |||
@@ -143,7 +145,7 @@ public class ReportAnalysisFailureNotificationExecutionListenerTest { | |||
expectedException.expect(RowNotFoundException.class); | |||
expectedException.expectMessage("Component with uuid '" + componentUuid + "' not found"); | |||
underTest.onEnd(ceTaskMock, CeActivityDto.Status.FAILED, ceTaskResultMock, throwableMock); | |||
underTest.onEnd(ceTaskMock, CeActivityDto.Status.FAILED, randomDuration(), ceTaskResultMock, throwableMock); | |||
} | |||
@Test | |||
@@ -166,7 +168,7 @@ public class ReportAnalysisFailureNotificationExecutionListenerTest { | |||
when(notificationService.hasProjectSubscribersForTypes(component.uuid(), singleton(ReportAnalysisFailureNotification.class))) | |||
.thenReturn(true); | |||
underTest.onEnd(ceTaskMock, CeActivityDto.Status.FAILED, ceTaskResultMock, throwableMock); | |||
underTest.onEnd(ceTaskMock, CeActivityDto.Status.FAILED, randomDuration(), ceTaskResultMock, throwableMock); | |||
fail("An IllegalArgumentException should have been thrown for component " + component); | |||
} catch (IllegalArgumentException e) { | |||
@@ -190,7 +192,7 @@ public class ReportAnalysisFailureNotificationExecutionListenerTest { | |||
expectedException.expect(RowNotFoundException.class); | |||
expectedException.expectMessage("CeActivity with uuid '" + taskUuid + "' not found"); | |||
underTest.onEnd(ceTaskMock, CeActivityDto.Status.FAILED, ceTaskResultMock, throwableMock); | |||
underTest.onEnd(ceTaskMock, CeActivityDto.Status.FAILED, randomDuration(), ceTaskResultMock, throwableMock); | |||
} | |||
@Test | |||
@@ -201,7 +203,7 @@ public class ReportAnalysisFailureNotificationExecutionListenerTest { | |||
ComponentDto project = initMocksToPassConditions(taskUuid, createdAt, executedAt); | |||
Notification notificationMock = mockSerializer(); | |||
underTest.onEnd(ceTaskMock, CeActivityDto.Status.FAILED, ceTaskResultMock, throwableMock); | |||
underTest.onEnd(ceTaskMock, CeActivityDto.Status.FAILED, randomDuration(), ceTaskResultMock, throwableMock); | |||
ArgumentCaptor<ReportAnalysisFailureNotificationBuilder> notificationCaptor = verifyAndCaptureSerializedNotification(); | |||
verify(notificationService).deliver(same(notificationMock)); | |||
@@ -225,7 +227,7 @@ public class ReportAnalysisFailureNotificationExecutionListenerTest { | |||
String message = randomAlphanumeric(66); | |||
when(throwableMock.getMessage()).thenReturn(message); | |||
underTest.onEnd(ceTaskMock, CeActivityDto.Status.FAILED, ceTaskResultMock, throwableMock); | |||
underTest.onEnd(ceTaskMock, CeActivityDto.Status.FAILED, randomDuration(), ceTaskResultMock, throwableMock); | |||
ArgumentCaptor<ReportAnalysisFailureNotificationBuilder> notificationCaptor = verifyAndCaptureSerializedNotification(); | |||
@@ -239,7 +241,7 @@ public class ReportAnalysisFailureNotificationExecutionListenerTest { | |||
initMocksToPassConditions(taskUuid, random.nextInt(999_999), (long) random.nextInt(999_999)); | |||
Notification notificationMock = mockSerializer(); | |||
underTest.onEnd(ceTaskMock, CeActivityDto.Status.FAILED, ceTaskResultMock, null); | |||
underTest.onEnd(ceTaskMock, CeActivityDto.Status.FAILED, randomDuration(), ceTaskResultMock, null); | |||
verify(notificationService).deliver(same(notificationMock)); | |||
ArgumentCaptor<ReportAnalysisFailureNotificationBuilder> notificationCaptor = verifyAndCaptureSerializedNotification(); | |||
@@ -254,7 +256,7 @@ public class ReportAnalysisFailureNotificationExecutionListenerTest { | |||
initMocksToPassConditions(taskUuid, random.nextInt(999_999), (long) random.nextInt(999_999)); | |||
Notification notificationMock = mockSerializer(); | |||
underTest.onEnd(ceTaskMock, CeActivityDto.Status.FAILED, null, null); | |||
underTest.onEnd(ceTaskMock, CeActivityDto.Status.FAILED, randomDuration(), null, null); | |||
verify(notificationService).deliver(same(notificationMock)); | |||
} | |||
@@ -265,7 +267,7 @@ public class ReportAnalysisFailureNotificationExecutionListenerTest { | |||
initMocksToPassConditions(taskUuid, random.nextInt(999_999), (long) random.nextInt(999_999)); | |||
Notification notificationMock = mockSerializer(); | |||
underTest.onEnd(ceTaskMock, CeActivityDto.Status.FAILED, ceTaskResultMock, null); | |||
underTest.onEnd(ceTaskMock, CeActivityDto.Status.FAILED, randomDuration(), ceTaskResultMock, null); | |||
verify(notificationService).deliver(same(notificationMock)); | |||
verifyZeroInteractions(ceTaskResultMock); | |||
@@ -279,7 +281,7 @@ public class ReportAnalysisFailureNotificationExecutionListenerTest { | |||
when(system2.now()).thenReturn(now); | |||
Notification notificationMock = mockSerializer(); | |||
underTest.onEnd(ceTaskMock, CeActivityDto.Status.FAILED, ceTaskResultMock, null); | |||
underTest.onEnd(ceTaskMock, CeActivityDto.Status.FAILED, randomDuration(), ceTaskResultMock, null); | |||
verify(notificationService).deliver(same(notificationMock)); | |||
ArgumentCaptor<ReportAnalysisFailureNotificationBuilder> notificationCaptor = verifyAndCaptureSerializedNotification(); | |||
@@ -309,8 +311,8 @@ public class ReportAnalysisFailureNotificationExecutionListenerTest { | |||
.setTaskType(CeTaskTypes.REPORT) | |||
.setComponentUuid(project.uuid()) | |||
.setCreatedAt(createdAt)) | |||
.setExecutedAt(executedAt) | |||
.setStatus(CeActivityDto.Status.FAILED)); | |||
.setExecutedAt(executedAt) | |||
.setStatus(CeActivityDto.Status.FAILED)); | |||
dbTester.getSession().commit(); | |||
} | |||
@@ -319,4 +321,8 @@ public class ReportAnalysisFailureNotificationExecutionListenerTest { | |||
verify(serializer).toNotification(notificationCaptor.capture()); | |||
return notificationCaptor; | |||
} | |||
private Duration randomDuration() { | |||
return Duration.of(random.nextLong(), ChronoUnit.MILLIS); | |||
} | |||
} |
@@ -19,13 +19,14 @@ | |||
*/ | |||
package org.sonar.ce.taskprocessor; | |||
import java.time.Duration; | |||
import java.time.temporal.ChronoUnit; | |||
import java.util.Random; | |||
import org.junit.Test; | |||
import org.sonar.ce.task.CeTask; | |||
import org.sonar.ce.task.CeTaskInterrupter; | |||
import org.sonar.db.ce.CeActivityDto; | |||
import static org.assertj.core.api.Assertions.assertThat; | |||
import static org.mockito.ArgumentMatchers.same; | |||
import static org.mockito.Mockito.mock; | |||
import static org.mockito.Mockito.verify; | |||
@@ -49,7 +50,7 @@ public class CeTaskInterrupterWorkerExecutionListenerTest { | |||
CeTask ceTask = mock(CeTask.class); | |||
CeActivityDto.Status randomStatus = CeActivityDto.Status.values()[new Random().nextInt(CeActivityDto.Status.values().length)]; | |||
underTest.onEnd(ceTask, randomStatus, null, null); | |||
underTest.onEnd(ceTask, randomStatus, Duration.of(1, ChronoUnit.SECONDS), null, null); | |||
verify(ceTaskInterrupter).onEnd(same(ceTask)); | |||
} |
@@ -19,6 +19,8 @@ | |||
*/ | |||
package org.sonar.ce.taskprocessor; | |||
import java.time.Duration; | |||
import java.time.temporal.ChronoUnit; | |||
import java.util.Random; | |||
import org.junit.Test; | |||
import org.mockito.Mockito; | |||
@@ -48,7 +50,7 @@ public class CeTaskLoggingWorkerExecutionListenerTest { | |||
public void onEnd_calls_clearForTask() { | |||
underTest.onEnd(mock(CeTask.class), | |||
CeActivityDto.Status.values()[new Random().nextInt(CeActivityDto.Status.values().length)], | |||
null, null); | |||
Duration.of(1, ChronoUnit.SECONDS), null, null); | |||
verify(ceTaskLogging).clearForTask(); | |||
verifyNoMoreInteractions(ceTaskLogging); |
@@ -61,6 +61,8 @@ import static org.apache.commons.lang.RandomStringUtils.randomAlphabetic; | |||
import static org.assertj.core.api.Assertions.assertThat; | |||
import static org.mockito.ArgumentMatchers.any; | |||
import static org.mockito.ArgumentMatchers.anyString; | |||
import static org.mockito.ArgumentMatchers.eq; | |||
import static org.mockito.ArgumentMatchers.isNull; | |||
import static org.mockito.Mockito.doThrow; | |||
import static org.mockito.Mockito.mock; | |||
import static org.mockito.Mockito.reset; | |||
@@ -170,8 +172,8 @@ public class CeWorkerImplTest { | |||
inOrder.verify(executionListener1).onStart(task); | |||
inOrder.verify(executionListener2).onStart(task); | |||
inOrder.verify(queue).remove(task, CeActivityDto.Status.FAILED, null, null); | |||
inOrder.verify(executionListener1).onEnd(task, CeActivityDto.Status.FAILED, null, null); | |||
inOrder.verify(executionListener2).onEnd(task, CeActivityDto.Status.FAILED, null, null); | |||
inOrder.verify(executionListener1).onEnd(eq(task), eq(CeActivityDto.Status.FAILED), any(), isNull(), isNull()); | |||
inOrder.verify(executionListener2).onEnd(eq(task), eq(CeActivityDto.Status.FAILED), any(), isNull(), isNull()); | |||
} | |||
@Test | |||
@@ -200,8 +202,8 @@ public class CeWorkerImplTest { | |||
inOrder.verify(executionListener2).onStart(task); | |||
inOrder.verify(taskProcessor).process(task); | |||
inOrder.verify(queue).remove(task, CeActivityDto.Status.SUCCESS, null, null); | |||
inOrder.verify(executionListener1).onEnd(task, CeActivityDto.Status.SUCCESS, null, null); | |||
inOrder.verify(executionListener2).onEnd(task, CeActivityDto.Status.SUCCESS, null, null); | |||
inOrder.verify(executionListener1).onEnd(eq(task), eq(CeActivityDto.Status.SUCCESS), any(), isNull(), isNull()); | |||
inOrder.verify(executionListener2).onEnd(eq(task), eq(CeActivityDto.Status.SUCCESS), any(), isNull(), isNull()); | |||
} | |||
@Test | |||
@@ -232,8 +234,8 @@ public class CeWorkerImplTest { | |||
inOrder.verify(executionListener2).onStart(task); | |||
inOrder.verify(taskProcessor).process(task); | |||
inOrder.verify(queue).remove(task, CeActivityDto.Status.FAILED, null, error); | |||
inOrder.verify(executionListener1).onEnd(task, CeActivityDto.Status.FAILED, null, error); | |||
inOrder.verify(executionListener2).onEnd(task, CeActivityDto.Status.FAILED, null, error); | |||
inOrder.verify(executionListener1).onEnd(eq(task), eq(CeActivityDto.Status.FAILED), any(), isNull(), eq(error)); | |||
inOrder.verify(executionListener2).onEnd(eq(task), eq(CeActivityDto.Status.FAILED), any(), isNull(), eq(error)); | |||
} | |||
@Test |