package org.sonar.ce.taskprocessor;
import java.util.Optional;
+import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.sonar.api.utils.log.Logger;
import org.sonar.api.utils.log.Loggers;
@Override
public Result call() throws Exception {
+ return withCustomizedThreadName(this::findAndProcessTask);
+ }
+
+ private Result findAndProcessTask() {
Optional<CeTask> ceTask = tryAndFindTaskToExecute();
if (!ceTask.isPresent()) {
return NO_TASK;
return TASK_PROCESSED;
}
+ private <T> T withCustomizedThreadName(Supplier<T> supplier) {
+ Thread currentThread = Thread.currentThread();
+ String oldName = currentThread.getName();
+ try {
+ currentThread.setName(String.format("Worker %s (UUID=%s) on %s", getOrdinal(), getUUID(), oldName));
+ return supplier.get();
+ } finally {
+ currentThread.setName(oldName);
+ }
+ }
+
private Optional<CeTask> tryAndFindTaskToExecute() {
try {
return queue.peek(uuid);
import java.util.Random;
import java.util.UUID;
import javax.annotation.Nullable;
+import org.apache.commons.lang.RandomStringUtils;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
assertThat(logTester.logs(LoggerLevel.DEBUG)).isEmpty();
}
+ @Test
+ public void call_sets_and_restores_thread_name_with_information_of_worker_when_there_is_no_task_to_process() throws Exception {
+ String threadName = RandomStringUtils.randomAlphabetic(3);
+ when(queue.peek(anyString())).thenAnswer(invocation -> {
+ assertThat(Thread.currentThread().getName())
+ .isEqualTo("Worker " + randomOrdinal + " (UUID=" + workerUuid + ") on " + threadName);
+ return Optional.empty();
+ });
+ Thread newThread = createThreadNameVerifyingThread(threadName);
+
+ newThread.start();
+ newThread.join();
+ }
+
+ @Test
+ public void call_sets_and_restores_thread_name_with_information_of_worker_when_a_task_is_processed() throws Exception {
+ String threadName = RandomStringUtils.randomAlphabetic(3);
+ when(queue.peek(anyString())).thenAnswer(invocation -> {
+ assertThat(Thread.currentThread().getName())
+ .isEqualTo("Worker " + randomOrdinal + " (UUID=" + workerUuid + ") on " + threadName);
+ return Optional.of(createCeTask("FooBar"));
+ });
+ taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
+ Thread newThread = createThreadNameVerifyingThread(threadName);
+
+ newThread.start();
+ newThread.join();
+ }
+
+ @Test
+ public void call_sets_and_restores_thread_name_with_information_of_worker_when_an_error_occurs() throws Exception {
+ String threadName = RandomStringUtils.randomAlphabetic(3);
+ CeTask ceTask = createCeTask("FooBar");
+ when(queue.peek(anyString())).thenAnswer(invocation -> {
+ assertThat(Thread.currentThread().getName())
+ .isEqualTo("Worker " + randomOrdinal + " (UUID=" + workerUuid + ") on " + threadName);
+ return Optional.of(ceTask);
+ });
+ taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
+ makeTaskProcessorFail(ceTask);
+ Thread newThread = createThreadNameVerifyingThread(threadName);
+
+ newThread.start();
+ newThread.join();
+ }
+
+ private Thread createThreadNameVerifyingThread(String threadName) {
+ return new Thread(() -> {
+ verifyUnchangedThreadName(threadName);
+ try {
+ underTest.call();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ verifyUnchangedThreadName(threadName);
+ }, threadName);
+ }
+
+ private void verifyUnchangedThreadName(String threadName) {
+ assertThat(Thread.currentThread().getName()).isEqualTo(threadName);
+ }
+
private void verifyWorkerUuid() {
verify(queue).peek(workerUuidCaptor.capture());
assertThat(workerUuidCaptor.getValue()).isEqualTo(workerUuid);