]> source.dussan.org Git - sonarqube.git/commitdiff
SONAR-8985 add unique identifier to CeWorker
authorSébastien Lesaint <sebastien.lesaint@sonarsource.com>
Tue, 18 Apr 2017 10:35:48 +0000 (12:35 +0200)
committerEric Hartmann <hartmann.eric@gmail.Com>
Thu, 27 Apr 2017 07:23:18 +0000 (09:23 +0200)
14 files changed:
server/sonar-ce-api/src/main/java/org/sonar/ce/configuration/CeConfiguration.java
server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerExecutorService.java
server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImpl.java
server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeTaskProcessorModule.java
server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorker.java [new file with mode: 0644]
server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerCallable.java [deleted file]
server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerCallableImpl.java [deleted file]
server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerFactory.java [new file with mode: 0644]
server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerFactoryImpl.java [new file with mode: 0644]
server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerImpl.java [new file with mode: 0644]
server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImplTest.java
server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerCallableImplTest.java [deleted file]
server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerFactoryImplTest.java [new file with mode: 0644]
server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerImplTest.java [new file with mode: 0644]

index eedc8d4893fbe829319faede37378a4c74424c4e..dd27ba3f88209397234ce78f96a57cc779bce86d 100644 (file)
@@ -27,8 +27,8 @@ public interface CeConfiguration {
   int getWorkerCount();
 
   /**
-   * The delay in milliseconds before calling another {@link org.sonar.server.computation.taskprocessor.CeWorkerCallable}
-   * when previous one had nothing to do.
+   * The delay in millisecond before a {@link CeWorker} shall try and find a task
+   * to process when it's previous execution had nothing to do.
    */
   long getQueuePollingDelay();
 
index 2aa14e6962da264093bdb51a2d2d39cf0c7bd5af..75097fd6d0cef5e7495dfb49268b923ec6131933 100644 (file)
@@ -23,7 +23,7 @@ import com.google.common.util.concurrent.ListeningScheduledExecutorService;
 import org.sonar.server.util.StoppableExecutorService;
 
 /**
- * The {@link java.util.concurrent.ExecutorService} responsible for running {@link CeWorkerCallableImpl}.
+ * The {@link java.util.concurrent.ExecutorService} responsible for running {@link CeWorkerImpl}.
  */
 public interface CeProcessingSchedulerExecutorService extends StoppableExecutorService, ListeningScheduledExecutorService {
 }
index 47f56af6ff123c7aee3a3194b7c050b087877ade..fd145a6815d0b53f9b7912a12a14e61d7b34a7a6 100644 (file)
@@ -39,16 +39,14 @@ public class CeProcessingSchedulerImpl implements CeProcessingScheduler, Startab
   private static final Logger LOG = Loggers.get(CeProcessingSchedulerImpl.class);
 
   private final CeProcessingSchedulerExecutorService executorService;
-  private final CeWorkerCallable workerRunnable;
 
   private final long delayBetweenTasks;
   private final TimeUnit timeUnit;
   private final ChainingCallback[] chainingCallbacks;
 
   public CeProcessingSchedulerImpl(CeConfiguration ceConfiguration,
-    CeProcessingSchedulerExecutorService processingExecutorService, CeWorkerCallable workerRunnable) {
+    CeProcessingSchedulerExecutorService processingExecutorService, CeWorkerFactory ceCeWorkerFactory) {
     this.executorService = processingExecutorService;
-    this.workerRunnable = workerRunnable;
 
     this.delayBetweenTasks = ceConfiguration.getQueuePollingDelay();
     this.timeUnit = MILLISECONDS;
@@ -56,7 +54,8 @@ public class CeProcessingSchedulerImpl implements CeProcessingScheduler, Startab
     int workerCount = ceConfiguration.getWorkerCount();
     this.chainingCallbacks = new ChainingCallback[workerCount];
     for (int i = 0; i < workerCount; i++) {
-      chainingCallbacks[i] = new ChainingCallback();
+      CeWorker worker = ceCeWorkerFactory.create();
+      chainingCallbacks[i] = new ChainingCallback(worker);
     }
   }
 
@@ -68,7 +67,7 @@ public class CeProcessingSchedulerImpl implements CeProcessingScheduler, Startab
   @Override
   public void startScheduling() {
     for (ChainingCallback chainingCallback : chainingCallbacks) {
-      ListenableScheduledFuture<Boolean> future = executorService.schedule(workerRunnable, delayBetweenTasks, timeUnit);
+      ListenableScheduledFuture<Boolean> future = executorService.schedule(chainingCallback.worker, delayBetweenTasks, timeUnit);
       addCallback(future, chainingCallback, executorService);
     }
   }
@@ -82,9 +81,15 @@ public class CeProcessingSchedulerImpl implements CeProcessingScheduler, Startab
 
   private class ChainingCallback implements FutureCallback<Boolean> {
     private final AtomicBoolean keepRunning = new AtomicBoolean(true);
+    private final CeWorker worker;
+
     @CheckForNull
     private ListenableFuture<Boolean> workerFuture;
 
+    public ChainingCallback(CeWorker worker) {
+      this.worker = worker;
+    }
+
     @Override
     public void onSuccess(@Nullable Boolean result) {
       if (result != null && result) {
@@ -105,14 +110,14 @@ public class CeProcessingSchedulerImpl implements CeProcessingScheduler, Startab
 
     private void chainWithoutDelay() {
       if (keepRunning()) {
-        workerFuture = executorService.submit(workerRunnable);
+        workerFuture = executorService.submit(worker);
       }
       addCallback();
     }
 
     private void chainWithDelay() {
       if (keepRunning()) {
-        workerFuture = executorService.schedule(workerRunnable, delayBetweenTasks, timeUnit);
+        workerFuture = executorService.schedule(worker, delayBetweenTasks, timeUnit);
       }
       addCallback();
     }
index b6f08854b8cff0d616a784c35d9a458a1abf9ec7..9163507abe965ac066839775ab4872e05a0002d1 100644 (file)
@@ -26,7 +26,7 @@ public class CeTaskProcessorModule extends Module {
   protected void configureModule() {
     add(
       CeTaskProcessorRepositoryImpl.class,
-      CeWorkerCallableImpl.class,
+      CeWorkerFactoryImpl.class,
       CeProcessingSchedulerExecutorServiceImpl.class,
       CeProcessingSchedulerImpl.class);
   }
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorker.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorker.java
new file mode 100644 (file)
index 0000000..7390b4a
--- /dev/null
@@ -0,0 +1,33 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+package org.sonar.ce.taskprocessor;
+
+import java.util.concurrent.Callable;
+import org.sonar.ce.queue.CeQueue;
+import org.sonar.ce.queue.CeTask;
+
+/**
+ * Marker interface of the runnable in charge of polling the {@link CeQueue} and executing {@link CeTask}.
+ * {@link Callable#call()} returns a Boolean which is {@code true} when some a {@link CeTask} was processed,
+ * {@code false} otherwise.
+ */
+public interface CeWorker extends Callable<Boolean> {
+  String getUUID();
+}
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerCallable.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerCallable.java
deleted file mode 100644 (file)
index 2618b37..0000000
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * SonarQube
- * Copyright (C) 2009-2017 SonarSource SA
- * mailto:info AT sonarsource DOT com
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
- */
-package org.sonar.ce.taskprocessor;
-
-import java.util.concurrent.Callable;
-import org.sonar.ce.queue.CeQueue;
-import org.sonar.ce.queue.CeTask;
-
-/**
- * Marker interface of the runnable in charge of polling the {@link CeQueue} and executing {@link CeTask}.
- * {@link Callable#call()} returns a Boolean which is {@code true} when some a {@link CeTask} was processed,
- * {@code false} otherwise.
- */
-public interface CeWorkerCallable extends Callable<Boolean> {
-}
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerCallableImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerCallableImpl.java
deleted file mode 100644 (file)
index eb6d9d2..0000000
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * SonarQube
- * Copyright (C) 2009-2017 SonarSource SA
- * mailto:info AT sonarsource DOT com
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
- */
-package org.sonar.ce.taskprocessor;
-
-import java.util.Optional;
-import java.util.concurrent.atomic.AtomicLong;
-import javax.annotation.Nullable;
-import org.sonar.api.utils.log.Logger;
-import org.sonar.api.utils.log.Loggers;
-import org.sonar.ce.log.CeLogging;
-import org.sonar.ce.queue.CeTask;
-import org.sonar.ce.queue.CeTaskResult;
-import org.sonar.ce.queue.InternalCeQueue;
-import org.sonar.core.util.logs.Profiler;
-import org.sonar.db.ce.CeActivityDto;
-
-import static java.lang.String.format;
-
-public class CeWorkerCallableImpl implements CeWorkerCallable {
-
-  private static final Logger LOG = Loggers.get(CeWorkerCallableImpl.class);
-
-  private final InternalCeQueue queue;
-  private final CeLogging ceLogging;
-  private final CeTaskProcessorRepository taskProcessorRepository;
-
-  public CeWorkerCallableImpl(InternalCeQueue queue, CeLogging ceLogging, CeTaskProcessorRepository taskProcessorRepository) {
-    this.queue = queue;
-    this.ceLogging = ceLogging;
-    this.taskProcessorRepository = taskProcessorRepository;
-  }
-
-  @Override
-  public Boolean call() throws Exception {
-    Optional<CeTask> ceTask = tryAndFindTaskToExecute();
-    if (!ceTask.isPresent()) {
-      return false;
-    }
-
-    try {
-      executeTask(ceTask.get());
-    } catch (Exception e) {
-      LOG.error(format("An error occurred while executing task with uuid '%s'", ceTask.get().getUuid()), e);
-    }
-    return true;
-  }
-
-  private static final AtomicLong counter = new AtomicLong(0);
-  private Optional<CeTask> tryAndFindTaskToExecute() {
-    try {
-      return queue.peek("uuid" + counter.addAndGet(100));
-    } catch (Exception e) {
-      LOG.error("Failed to pop the queue of analysis reports", e);
-    }
-    return Optional.empty();
-  }
-
-  private void executeTask(CeTask task) {
-    ceLogging.initForTask(task);
-    Profiler ceProfiler = startActivityProfiler(task);
-
-    CeActivityDto.Status status = CeActivityDto.Status.FAILED;
-    CeTaskResult taskResult = null;
-    Throwable error = null;
-    try {
-      // TODO delegate the message to the related task processor, according to task type
-      Optional<CeTaskProcessor> taskProcessor = taskProcessorRepository.getForCeTask(task);
-      if (taskProcessor.isPresent()) {
-        taskResult = taskProcessor.get().process(task);
-        status = CeActivityDto.Status.SUCCESS;
-      } else {
-        LOG.error("No CeTaskProcessor is defined for task of type {}. Plugin configuration may have changed", task.getType());
-        status = CeActivityDto.Status.FAILED;
-      }
-    } catch (Throwable e) {
-      LOG.error(format("Failed to execute task %s", task.getUuid()), e);
-      error = e;
-    } finally {
-      finalizeTask(task, ceProfiler, status, taskResult, error);
-    }
-  }
-
-  private void finalizeTask(CeTask task, Profiler ceProfiler, CeActivityDto.Status status,
-    @Nullable CeTaskResult taskResult, @Nullable Throwable error) {
-    try {
-      queue.remove(task, status, taskResult, error);
-    } catch (Exception e) {
-      LOG.error(format("Failed to finalize task with uuid '%s' and persist its state to db", task.getUuid()), e);
-    } finally {
-      stopActivityProfiler(ceProfiler, task, status);
-      ceLogging.clearForTask();
-    }
-  }
-
-  private static Profiler startActivityProfiler(CeTask task) {
-    Profiler profiler = Profiler.create(LOG);
-    addContext(profiler, task);
-    return profiler.startInfo("Execute task");
-  }
-
-  private static void stopActivityProfiler(Profiler profiler, CeTask task, CeActivityDto.Status status) {
-    addContext(profiler, task);
-    if (status == CeActivityDto.Status.FAILED) {
-      profiler.stopError("Executed task");
-    } else {
-      profiler.stopInfo("Executed task");
-    }
-  }
-
-  private static void addContext(Profiler profiler, CeTask task) {
-    profiler
-      .logTimeLast(true)
-      .addContext("project", task.getComponentKey())
-      .addContext("type", task.getType())
-      .addContext("id", task.getUuid());
-    String submitterLogin = task.getSubmitterLogin();
-    if (submitterLogin != null) {
-      profiler.addContext("submitter", submitterLogin);
-    }
-  }
-
-}
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerFactory.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerFactory.java
new file mode 100644 (file)
index 0000000..65d9096
--- /dev/null
@@ -0,0 +1,39 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+package org.sonar.ce.taskprocessor;
+
+import java.util.Set;
+
+/**
+ * A factory that will create the CeWorkerFactory with an UUID
+ */
+public interface CeWorkerFactory {
+  /**
+   * Create a new CeWorker object.
+   * Each {@link CeWorker} returned by this method will have a different UUID from the others and all of these UUIDS will be returned by {@link #getWorkerUUIDs()}.
+   *
+   * @return the CeWorker
+   */
+  CeWorker create();
+  /**
+   * @return  the UUIDs of each {@link CeWorker} object returned by {@link #create}.
+   */
+  Set<String> getWorkerUUIDs();
+}
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerFactoryImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerFactoryImpl.java
new file mode 100644 (file)
index 0000000..e684caa
--- /dev/null
@@ -0,0 +1,56 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+
+package org.sonar.ce.taskprocessor;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.sonar.ce.log.CeLogging;
+import org.sonar.ce.queue.InternalCeQueue;
+import org.sonar.core.util.UuidFactory;
+
+import static com.google.common.collect.ImmutableSet.copyOf;
+
+public class CeWorkerFactoryImpl implements CeWorkerFactory {
+  private final UuidFactory uuidFactory;
+  private final Set<String> ceWorkerUUIDs = new HashSet<>();
+  private final InternalCeQueue queue;
+  private final CeLogging ceLogging;
+  private final CeTaskProcessorRepository taskProcessorRepository;
+
+  public CeWorkerFactoryImpl(InternalCeQueue queue, CeLogging ceLogging, CeTaskProcessorRepository taskProcessorRepository, UuidFactory uuidFactory) {
+    this.queue = queue;
+    this.ceLogging = ceLogging;
+    this.taskProcessorRepository = taskProcessorRepository;
+    this.uuidFactory= uuidFactory;
+  }
+
+  @Override
+  public CeWorker create() {
+    String uuid = uuidFactory.create();
+    ceWorkerUUIDs.add(uuid);
+    return new CeWorkerImpl(queue, ceLogging, taskProcessorRepository, uuid);
+  }
+
+  @Override
+  public Set<String> getWorkerUUIDs() {
+    return copyOf(ceWorkerUUIDs);
+  }
+}
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerImpl.java
new file mode 100644 (file)
index 0000000..025f636
--- /dev/null
@@ -0,0 +1,144 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+package org.sonar.ce.taskprocessor;
+
+import java.util.Optional;
+import javax.annotation.Nullable;
+import org.sonar.api.utils.log.Logger;
+import org.sonar.api.utils.log.Loggers;
+import org.sonar.ce.log.CeLogging;
+import org.sonar.ce.queue.CeTask;
+import org.sonar.ce.queue.CeTaskResult;
+import org.sonar.ce.queue.InternalCeQueue;
+import org.sonar.core.util.logs.Profiler;
+import org.sonar.db.ce.CeActivityDto;
+
+import static java.lang.String.format;
+
+public class CeWorkerImpl implements CeWorker {
+
+  private static final Logger LOG = Loggers.get(CeWorkerImpl.class);
+
+  private final InternalCeQueue queue;
+  private final CeLogging ceLogging;
+  private final CeTaskProcessorRepository taskProcessorRepository;
+  private final String uuid;
+
+  public CeWorkerImpl(InternalCeQueue queue, CeLogging ceLogging, CeTaskProcessorRepository taskProcessorRepository, String uuid) {
+    this.queue = queue;
+    this.ceLogging = ceLogging;
+    this.taskProcessorRepository = taskProcessorRepository;
+    this.uuid = uuid;
+  }
+
+  @Override
+  public Boolean call() throws Exception {
+    Optional<CeTask> ceTask = tryAndFindTaskToExecute();
+    if (!ceTask.isPresent()) {
+      return false;
+    }
+
+    try {
+      executeTask(ceTask.get());
+    } catch (Exception e) {
+      LOG.error(format("An error occurred while executing task with uuid '%s'", ceTask.get().getUuid()), e);
+    }
+    return true;
+  }
+
+
+  private Optional<CeTask> tryAndFindTaskToExecute() {
+    try {
+      return queue.peek(uuid);
+    } catch (Exception e) {
+      LOG.error("Failed to pop the queue of analysis reports", e);
+    }
+    return Optional.empty();
+  }
+
+  @Override
+  public String getUUID() {
+    return uuid;
+  }
+
+  private void executeTask(CeTask task) {
+    ceLogging.initForTask(task);
+    Profiler ceProfiler = startActivityProfiler(task);
+
+    CeActivityDto.Status status = CeActivityDto.Status.FAILED;
+    CeTaskResult taskResult = null;
+    Throwable error = null;
+    try {
+      // TODO delegate the message to the related task processor, according to task type
+      Optional<CeTaskProcessor> taskProcessor = taskProcessorRepository.getForCeTask(task);
+      if (taskProcessor.isPresent()) {
+        taskResult = taskProcessor.get().process(task);
+        status = CeActivityDto.Status.SUCCESS;
+      } else {
+        LOG.error("No CeTaskProcessor is defined for task of type {}. Plugin configuration may have changed", task.getType());
+        status = CeActivityDto.Status.FAILED;
+      }
+    } catch (Throwable e) {
+      LOG.error(format("Failed to execute task %s", task.getUuid()), e);
+      error = e;
+    } finally {
+      finalizeTask(task, ceProfiler, status, taskResult, error);
+    }
+  }
+
+  private void finalizeTask(CeTask task, Profiler ceProfiler, CeActivityDto.Status status,
+    @Nullable CeTaskResult taskResult, @Nullable Throwable error) {
+    try {
+      queue.remove(task, status, taskResult, error);
+    } catch (Exception e) {
+      LOG.error(format("Failed to finalize task with uuid '%s' and persist its state to db", task.getUuid()), e);
+    } finally {
+      stopActivityProfiler(ceProfiler, task, status);
+      ceLogging.clearForTask();
+    }
+  }
+
+  private static Profiler startActivityProfiler(CeTask task) {
+    Profiler profiler = Profiler.create(LOG);
+    addContext(profiler, task);
+    return profiler.startInfo("Execute task");
+  }
+
+  private static void stopActivityProfiler(Profiler profiler, CeTask task, CeActivityDto.Status status) {
+    addContext(profiler, task);
+    if (status == CeActivityDto.Status.FAILED) {
+      profiler.stopError("Executed task");
+    } else {
+      profiler.stopInfo("Executed task");
+    }
+  }
+
+  private static void addContext(Profiler profiler, CeTask task) {
+    profiler
+      .logTimeLast(true)
+      .addContext("project", task.getComponentKey())
+      .addContext("type", task.getType())
+      .addContext("id", task.getUuid());
+    String submitterLogin = task.getSubmitterLogin();
+    if (submitterLogin != null) {
+      profiler.addContext("submitter", submitterLogin);
+    }
+  }
+}
index d3480186bd24693408a1ab0e5354baf11fdd7a44..43585cf63d12df9e3c5465e46bf548b417321208 100644 (file)
@@ -25,10 +25,12 @@ import com.google.common.util.concurrent.ListeningScheduledExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
 import java.util.Queue;
 import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Delayed;
@@ -45,11 +47,14 @@ import org.junit.Test;
 import org.junit.rules.Timeout;
 import org.sonar.ce.configuration.CeConfigurationRule;
 
+import static com.google.common.collect.ImmutableList.copyOf;
+import static java.util.Collections.emptySet;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -62,17 +67,18 @@ public class CeProcessingSchedulerImplTest {
   public Timeout timeout = Timeout.seconds(60);
   @Rule
   public CeConfigurationRule ceConfiguration = new CeConfigurationRule();
-
-  private CeWorkerCallable ceWorkerRunnable = mock(CeWorkerCallable.class);
+  // Required to prevent an infinite loop
+  private CeWorker ceWorker = mock(CeWorker.class);
+  private CeWorkerFactory ceWorkerFactory = new TestCeWorkerFactory(ceWorker);
   private StubCeProcessingSchedulerExecutorService processingExecutorService = new StubCeProcessingSchedulerExecutorService();
-  private SchedulerCall regularDelayedPoll = new SchedulerCall(ceWorkerRunnable, 2000L, TimeUnit.MILLISECONDS);
-  private SchedulerCall notDelayedPoll = new SchedulerCall(ceWorkerRunnable);
+  private SchedulerCall regularDelayedPoll = new SchedulerCall(ceWorker, 2000L, MILLISECONDS);
+  private SchedulerCall notDelayedPoll = new SchedulerCall(ceWorker);
 
-  private CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(ceConfiguration, processingExecutorService, ceWorkerRunnable);
+  private CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(ceConfiguration, processingExecutorService, ceWorkerFactory);
 
   @Test
   public void polls_without_delay_when_CeWorkerCallable_returns_true() throws Exception {
-    when(ceWorkerRunnable.call())
+    when(ceWorker.call())
       .thenReturn(true)
       .thenThrow(ERROR_TO_INTERRUPT_CHAINING);
 
@@ -86,7 +92,7 @@ public class CeProcessingSchedulerImplTest {
 
   @Test
   public void polls_without_delay_when_CeWorkerCallable_throws_Exception_but_not_Error() throws Exception {
-    when(ceWorkerRunnable.call())
+    when(ceWorker.call())
       .thenThrow(new Exception("Exception is followed by a poll without delay"))
       .thenThrow(ERROR_TO_INTERRUPT_CHAINING);
 
@@ -100,7 +106,7 @@ public class CeProcessingSchedulerImplTest {
 
   @Test
   public void polls_with_regular_delay_when_CeWorkerCallable_returns_false() throws Exception {
-    when(ceWorkerRunnable.call())
+    when(ceWorker.call())
       .thenReturn(false)
       .thenThrow(ERROR_TO_INTERRUPT_CHAINING);
 
@@ -114,7 +120,7 @@ public class CeProcessingSchedulerImplTest {
 
   @Test
   public void startScheduling_schedules_CeWorkerCallable_at_fixed_rate_run_head_of_queue() throws Exception {
-    when(ceWorkerRunnable.call())
+    when(ceWorker.call())
       .thenReturn(true)
       .thenReturn(true)
       .thenReturn(false)
@@ -144,7 +150,7 @@ public class CeProcessingSchedulerImplTest {
 
   @Test
   public void stop_cancels_next_polling_and_does_not_add_any_new_one() throws Exception {
-    when(ceWorkerRunnable.call())
+    when(ceWorker.call())
       .thenReturn(false)
       .thenReturn(true)
       .thenReturn(false)
@@ -182,21 +188,36 @@ public class CeProcessingSchedulerImplTest {
   }
 
   @Test
-  public void when_workerCount_is_more_than_1_as_many_CeWorkerCallable_are_scheduled() throws InterruptedException {
+  public void when_workerCount_is_more_than_1_as_many_CeWorkerCallable_are_scheduled() throws Exception {
     int workerCount = Math.abs(new Random().nextInt(10)) + 1;
-
     ceConfiguration.setWorkerCount(workerCount);
 
+    CeWorker[] workers = new CeWorker[workerCount];
+    for (int i = 0; i < workerCount; i++) {
+      workers[i] = mock(CeWorker.class);
+      when(workers[i].call())
+        .thenReturn(false)
+        .thenThrow(ERROR_TO_INTERRUPT_CHAINING);
+    }
+
     ListenableScheduledFuture listenableScheduledFuture = mock(ListenableScheduledFuture.class);
     CeProcessingSchedulerExecutorService processingExecutorService = mock(CeProcessingSchedulerExecutorService.class);
-    CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(ceConfiguration, processingExecutorService, ceWorkerRunnable);
-    when(processingExecutorService.schedule(ceWorkerRunnable, ceConfiguration.getQueuePollingDelay(), MILLISECONDS))
+    when(processingExecutorService.schedule(any(CeWorker.class), any(Long.class),any(TimeUnit.class))).thenReturn(listenableScheduledFuture);
+
+    CeWorkerFactory ceWorkerFactory = spy(new TestCeWorkerFactory(workers));
+    CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(ceConfiguration, processingExecutorService, ceWorkerFactory);
+    when(processingExecutorService.schedule(ceWorker, ceConfiguration.getQueuePollingDelay(), MILLISECONDS))
         .thenReturn(listenableScheduledFuture);
 
     underTest.startScheduling();
+    // No exception from TestCeWorkerFactory must be thrown
 
-    verify(processingExecutorService, times(workerCount)).schedule(ceWorkerRunnable, ceConfiguration.getQueuePollingDelay(), MILLISECONDS);
+    // Verify that schedule has been called on all workers
+    for (int i = 0; i < workerCount; i++) {
+      verify(processingExecutorService).schedule(workers[i], ceConfiguration.getQueuePollingDelay(), MILLISECONDS);
+    }
     verify(listenableScheduledFuture, times(workerCount)).addListener(any(Runnable.class), eq(processingExecutorService));
+    verify(ceWorkerFactory, times(workerCount)).create();
   }
 
   private void startSchedulingAndRun() throws ExecutionException, InterruptedException {
@@ -206,6 +227,25 @@ public class CeProcessingSchedulerImplTest {
     processingExecutorService.runFutures();
   }
 
+  private class TestCeWorkerFactory implements CeWorkerFactory {
+    private final Iterator<CeWorker> ceWorkers;
+
+    private TestCeWorkerFactory(CeWorker... ceWorkers) {
+      this.ceWorkers = copyOf(ceWorkers).iterator();
+    }
+
+    @Override
+    public CeWorker create() {
+      // This will throw an NoSuchElementException if there are too many calls
+      return ceWorkers.next();
+    }
+
+    @Override
+    public Set<String> getWorkerUUIDs() {
+      return emptySet();
+    }
+  }
+
   /**
    * A synchronous implementation of {@link CeProcessingSchedulerExecutorService} which exposes a synchronous
    * method to execute futures it creates and exposes a method to retrieve logs of calls to
@@ -384,7 +424,7 @@ public class CeProcessingSchedulerImplTest {
         command.run();
       }
 
-      // ///////// unsupported operations ///////////
+      /////////// unsupported operations ///////////
 
       @Override
       public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
@@ -538,5 +578,4 @@ public class CeProcessingSchedulerImplTest {
         '}';
     }
   }
-
 }
diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerCallableImplTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerCallableImplTest.java
deleted file mode 100644 (file)
index 4fc5aee..0000000
+++ /dev/null
@@ -1,246 +0,0 @@
-/*
- * SonarQube
- * Copyright (C) 2009-2017 SonarSource SA
- * mailto:info AT sonarsource DOT com
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
- */
-package org.sonar.ce.taskprocessor;
-
-import java.util.List;
-import java.util.Optional;
-import javax.annotation.Nullable;
-import org.junit.Rule;
-import org.junit.Test;
-import org.mockito.ArgumentCaptor;
-import org.mockito.InOrder;
-import org.mockito.Mockito;
-import org.sonar.api.utils.log.LogTester;
-import org.sonar.api.utils.log.LoggerLevel;
-import org.sonar.ce.log.CeLogging;
-import org.sonar.ce.queue.CeTask;
-import org.sonar.ce.queue.InternalCeQueue;
-import org.sonar.db.ce.CeActivityDto;
-import org.sonar.db.ce.CeTaskTypes;
-import org.sonar.server.computation.task.projectanalysis.taskprocessor.ReportTaskProcessor;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyZeroInteractions;
-import static org.mockito.Mockito.when;
-
-public class CeWorkerCallableImplTest {
-
-  @Rule
-  public CeTaskProcessorRepositoryRule taskProcessorRepository = new CeTaskProcessorRepositoryRule();
-  @Rule
-  public LogTester logTester = new LogTester();
-
-  private InternalCeQueue queue = mock(InternalCeQueue.class);
-  private ReportTaskProcessor taskProcessor = mock(ReportTaskProcessor.class);
-  private CeLogging ceLogging = spy(CeLogging.class);
-  private ArgumentCaptor<String> workerUuid = ArgumentCaptor.forClass(String.class);
-  private CeWorkerCallable underTest = new CeWorkerCallableImpl(queue, ceLogging, taskProcessorRepository);
-  private InOrder inOrder = Mockito.inOrder(ceLogging, taskProcessor, queue);
-
-  @Test
-  public void no_pending_tasks_in_queue() throws Exception {
-    when(queue.peek(anyString())).thenReturn(Optional.empty());
-
-    assertThat(underTest.call()).isFalse();
-
-    verifyZeroInteractions(taskProcessor, ceLogging);
-  }
-
-  @Test
-  public void fail_when_no_CeTaskProcessor_is_found_in_repository() throws Exception {
-    CeTask task = createCeTask(null);
-    taskProcessorRepository.setNoProcessorForTask(CeTaskTypes.REPORT);
-    when(queue.peek(anyString())).thenReturn(Optional.of(task));
-
-    assertThat(underTest.call()).isTrue();
-
-    verifyWorkerUuid();
-    inOrder.verify(ceLogging).initForTask(task);
-    inOrder.verify(queue).remove(task, CeActivityDto.Status.FAILED, null, null);
-    inOrder.verify(ceLogging).clearForTask();
-  }
-
-  @Test
-  public void peek_and_process_task() throws Exception {
-    CeTask task = createCeTask(null);
-    taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor);
-    when(queue.peek(anyString())).thenReturn(Optional.of(task));
-
-    assertThat(underTest.call()).isTrue();
-
-    verifyWorkerUuid();
-    inOrder.verify(ceLogging).initForTask(task);
-    inOrder.verify(taskProcessor).process(task);
-    inOrder.verify(queue).remove(task, CeActivityDto.Status.SUCCESS, null, null);
-    inOrder.verify(ceLogging).clearForTask();
-  }
-
-  @Test
-  public void fail_to_process_task() throws Exception {
-    CeTask task = createCeTask(null);
-    when(queue.peek(anyString())).thenReturn(Optional.of(task));
-    taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor);
-    Throwable error = makeTaskProcessorFail(task);
-
-    assertThat(underTest.call()).isTrue();
-
-    verifyWorkerUuid();
-    inOrder.verify(ceLogging).initForTask(task);
-    inOrder.verify(taskProcessor).process(task);
-    inOrder.verify(queue).remove(task, CeActivityDto.Status.FAILED, null, error);
-    inOrder.verify(ceLogging).clearForTask();
-  }
-
-  @Test
-  public void do_not_display_submitter_param_in_log_when_submitterLogin_is_not_set_in_case_of_success() throws Exception {
-    when(queue.peek(anyString())).thenReturn(Optional.of(createCeTask(null)));
-    taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
-
-    underTest.call();
-
-    verifyWorkerUuid();
-    List<String> logs = logTester.logs(LoggerLevel.INFO);
-    assertThat(logs).hasSize(2);
-    for (int i = 0; i < 2; i++) {
-      assertThat(logs.get(i)).doesNotContain(" | submitter=");
-    }
-  }
-
-  @Test
-  public void do_not_display_submitter_param_in_log_when_submitterLogin_is_not_set_in_case_of_error() throws Exception {
-    CeTask ceTask = createCeTask(null);
-    when(queue.peek(anyString())).thenReturn(Optional.of(ceTask));
-    taskProcessorRepository.setProcessorForTask(ceTask.getType(), taskProcessor);
-    makeTaskProcessorFail(ceTask);
-
-    underTest.call();
-
-    verifyWorkerUuid();
-    List<String> logs = logTester.logs(LoggerLevel.INFO);
-    assertThat(logs).hasSize(1);
-    assertThat(logs.get(0)).doesNotContain(" | submitter=");
-    logs = logTester.logs(LoggerLevel.ERROR);
-    assertThat(logs).hasSize(2);
-    for (int i = 0; i < 2; i++) {
-      assertThat(logs.get(i)).doesNotContain(" | submitter=");
-    }
-    assertThat(logTester.logs(LoggerLevel.DEBUG)).isEmpty();
-  }
-
-  @Test
-  public void display_submitterLogin_in_logs_when_set_in_case_of_success() throws Exception {
-    when(queue.peek(anyString())).thenReturn(Optional.of(createCeTask("FooBar")));
-    taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
-
-    underTest.call();
-
-    verifyWorkerUuid();
-    List<String> logs = logTester.logs(LoggerLevel.INFO);
-    assertThat(logs).hasSize(2);
-    assertThat(logs.get(0)).contains(" | submitter=FooBar");
-    assertThat(logs.get(1)).contains(" | submitter=FooBar | time=");
-    assertThat(logTester.logs(LoggerLevel.ERROR)).isEmpty();
-    assertThat(logTester.logs(LoggerLevel.DEBUG)).isEmpty();
-  }
-
-  @Test
-  public void display_submitterLogin_in_logs_when_set_in_case_of_error() throws Exception {
-    CeTask ceTask = createCeTask("FooBar");
-    when(queue.peek(anyString())).thenReturn(Optional.of(ceTask));
-    taskProcessorRepository.setProcessorForTask(ceTask.getType(), taskProcessor);
-    makeTaskProcessorFail(ceTask);
-
-    underTest.call();
-
-    verifyWorkerUuid();
-    List<String> logs = logTester.logs(LoggerLevel.INFO);
-    assertThat(logs).hasSize(1);
-    assertThat(logs.iterator().next()).contains(" | submitter=FooBar");
-    logs = logTester.logs(LoggerLevel.ERROR);
-    assertThat(logs).hasSize(2);
-    assertThat(logs.get(0)).isEqualTo("Failed to execute task " + ceTask.getUuid());
-    assertThat(logs.get(1)).contains(" | submitter=FooBar | time=");
-  }
-
-  @Test
-  public void display_start_stop_at_debug_level_for_console_if_DEBUG_is_enabled_and_task_successful() throws Exception {
-    logTester.setLevel(LoggerLevel.DEBUG);
-
-    when(queue.peek(anyString())).thenReturn(Optional.of(createCeTask("FooBar")));
-    taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
-
-    underTest.call();
-
-    verifyWorkerUuid();
-    List<String> logs = logTester.logs(LoggerLevel.INFO);
-    assertThat(logs).hasSize(2);
-    assertThat(logs.get(0)).contains(" | submitter=FooBar");
-    assertThat(logs.get(1)).contains(" | submitter=FooBar | time=");
-    assertThat(logTester.logs(LoggerLevel.ERROR)).isEmpty();
-    assertThat(logTester.logs(LoggerLevel.DEBUG)).isEmpty();
-  }
-
-  @Test
-  public void display_start_at_debug_level_stop_at_error_level_for_console_if_DEBUG_is_enabled_and_task_failed() throws Exception {
-    logTester.setLevel(LoggerLevel.DEBUG);
-
-    CeTask ceTask = createCeTask("FooBar");
-    when(queue.peek(anyString())).thenReturn(Optional.of(ceTask));
-    taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
-    makeTaskProcessorFail(ceTask);
-
-    underTest.call();
-
-    verifyWorkerUuid();
-    List<String> logs = logTester.logs(LoggerLevel.INFO);
-    assertThat(logs).hasSize(1);
-    assertThat(logs.iterator().next()).contains(" | submitter=FooBar");
-    logs = logTester.logs(LoggerLevel.ERROR);
-    assertThat(logs).hasSize(2);
-    assertThat(logs.get(0)).isEqualTo("Failed to execute task " + ceTask.getUuid());
-    assertThat(logs.get(1)).contains(" | submitter=FooBar | time=");
-    assertThat(logTester.logs(LoggerLevel.DEBUG)).isEmpty();
-  }
-
-  private void verifyWorkerUuid() {
-    verify(queue).peek(workerUuid.capture());
-    assertThat(workerUuid.getValue()).startsWith("uuid");
-  }
-
-  private static CeTask createCeTask(@Nullable String submitterLogin) {
-    return new CeTask.Builder()
-      .setOrganizationUuid("org1")
-      .setUuid("TASK_1").setType(CeTaskTypes.REPORT)
-      .setComponentUuid("PROJECT_1")
-      .setSubmitterLogin(submitterLogin)
-      .build();
-  }
-
-  private IllegalStateException makeTaskProcessorFail(CeTask task) {
-    IllegalStateException error = new IllegalStateException("simulate exception thrown by TaskProcessor#process");
-    doThrow(error).when(taskProcessor).process(task);
-    return error;
-  }
-}
diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerFactoryImplTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerFactoryImplTest.java
new file mode 100644 (file)
index 0000000..1f3b4aa
--- /dev/null
@@ -0,0 +1,73 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+
+package org.sonar.ce.taskprocessor;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.junit.Test;
+import org.sonar.ce.log.CeLogging;
+import org.sonar.ce.queue.InternalCeQueue;
+import org.sonar.core.util.UuidFactoryImpl;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+
+public class CeWorkerFactoryImplTest {
+  private CeWorkerFactoryImpl underTest = new CeWorkerFactoryImpl(mock(InternalCeQueue.class), mock(CeLogging.class),
+    mock(CeTaskProcessorRepository.class), UuidFactoryImpl.INSTANCE);
+
+  @Test
+  public void each_call_must_return_a_new_ceworker_with_unique_uuid() {
+    Set<CeWorker> ceWorkers = new HashSet<>();
+    Set<String> ceWorkerUUIDs = new HashSet<>();
+
+    for (int i = 0; i < 10; i++) {
+      CeWorker ceWorker = underTest.create();
+      ceWorkers.add(ceWorker);
+      ceWorkerUUIDs.add(ceWorker.getUUID());
+    }
+
+    assertThat(ceWorkers).hasSize(10);
+    assertThat(ceWorkerUUIDs).hasSize(10);
+  }
+
+  @Test
+  public void ceworker_created_by_factory_must_contain_uuid() {
+    CeWorker ceWorker = underTest.create();
+    assertThat(ceWorker.getUUID()).isNotEmpty();
+  }
+
+  @Test
+  public void CeWorkerFactory_has_an_empty_set_of_uuids_when_created() {
+    assertThat(underTest.getWorkerUUIDs()).isEmpty();
+  }
+
+  @Test
+  public void CeWorkerFactory_must_returns_the_uuids_of_worker() {
+    Set<String> ceWorkerUUIDs = new HashSet<>();
+
+    for (int i = 0; i < 10; i++) {
+      ceWorkerUUIDs.add(underTest.create().getUUID());
+    }
+
+    assertThat(underTest.getWorkerUUIDs()).isEqualTo(ceWorkerUUIDs);
+  }
+}
diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerImplTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerImplTest.java
new file mode 100644 (file)
index 0000000..155507b
--- /dev/null
@@ -0,0 +1,254 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+package org.sonar.ce.taskprocessor;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.UUID;
+import javax.annotation.Nullable;
+import org.junit.Rule;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.InOrder;
+import org.mockito.Mockito;
+import org.sonar.api.utils.log.LogTester;
+import org.sonar.api.utils.log.LoggerLevel;
+import org.sonar.ce.log.CeLogging;
+import org.sonar.ce.queue.CeTask;
+import org.sonar.ce.queue.InternalCeQueue;
+import org.sonar.db.ce.CeActivityDto;
+import org.sonar.db.ce.CeTaskTypes;
+import org.sonar.server.computation.task.projectanalysis.taskprocessor.ReportTaskProcessor;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+
+public class CeWorkerImplTest {
+
+  @Rule
+  public CeTaskProcessorRepositoryRule taskProcessorRepository = new CeTaskProcessorRepositoryRule();
+  @Rule
+  public LogTester logTester = new LogTester();
+
+  private InternalCeQueue queue = mock(InternalCeQueue.class);
+  private ReportTaskProcessor taskProcessor = mock(ReportTaskProcessor.class);
+  private CeLogging ceLogging = spy(CeLogging.class);
+  private ArgumentCaptor<String> workerUuid = ArgumentCaptor.forClass(String.class);
+  private CeWorker underTest = new CeWorkerImpl(queue, ceLogging, taskProcessorRepository, UUID.randomUUID().toString());
+  private InOrder inOrder = Mockito.inOrder(ceLogging, taskProcessor, queue);
+
+  @Test
+  public void getUUID_must_return_the_uuid_of_constructor() {
+    String uuid = UUID.randomUUID().toString();
+    CeWorker underTest = new CeWorkerImpl(queue, ceLogging, taskProcessorRepository, uuid);
+    assertThat(underTest.getUUID()).isEqualTo(uuid);
+  }
+
+  @Test
+  public void no_pending_tasks_in_queue() throws Exception {
+    when(queue.peek(anyString())).thenReturn(Optional.empty());
+
+    assertThat(underTest.call()).isFalse();
+
+    verifyZeroInteractions(taskProcessor, ceLogging);
+  }
+
+  @Test
+  public void fail_when_no_CeTaskProcessor_is_found_in_repository() throws Exception {
+    CeTask task = createCeTask(null);
+    taskProcessorRepository.setNoProcessorForTask(CeTaskTypes.REPORT);
+    when(queue.peek(anyString())).thenReturn(Optional.of(task));
+
+    assertThat(underTest.call()).isTrue();
+
+    verifyWorkerUuid();
+    inOrder.verify(ceLogging).initForTask(task);
+    inOrder.verify(queue).remove(task, CeActivityDto.Status.FAILED, null, null);
+    inOrder.verify(ceLogging).clearForTask();
+  }
+
+  @Test
+  public void peek_and_process_task() throws Exception {
+    CeTask task = createCeTask(null);
+    taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor);
+    when(queue.peek(anyString())).thenReturn(Optional.of(task));
+
+    assertThat(underTest.call()).isTrue();
+
+    verifyWorkerUuid();
+    inOrder.verify(ceLogging).initForTask(task);
+    inOrder.verify(taskProcessor).process(task);
+    inOrder.verify(queue).remove(task, CeActivityDto.Status.SUCCESS, null, null);
+    inOrder.verify(ceLogging).clearForTask();
+  }
+
+  @Test
+  public void fail_to_process_task() throws Exception {
+    CeTask task = createCeTask(null);
+    when(queue.peek(anyString())).thenReturn(Optional.of(task));
+    taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor);
+    Throwable error = makeTaskProcessorFail(task);
+
+    assertThat(underTest.call()).isTrue();
+
+    verifyWorkerUuid();
+    inOrder.verify(ceLogging).initForTask(task);
+    inOrder.verify(taskProcessor).process(task);
+    inOrder.verify(queue).remove(task, CeActivityDto.Status.FAILED, null, error);
+    inOrder.verify(ceLogging).clearForTask();
+  }
+
+  @Test
+  public void do_not_display_submitter_param_in_log_when_submitterLogin_is_not_set_in_case_of_success() throws Exception {
+    when(queue.peek(anyString())).thenReturn(Optional.of(createCeTask(null)));
+    taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
+
+    underTest.call();
+
+    verifyWorkerUuid();
+    List<String> logs = logTester.logs(LoggerLevel.INFO);
+    assertThat(logs).hasSize(2);
+    for (int i = 0; i < 2; i++) {
+      assertThat(logs.get(i)).doesNotContain(" | submitter=");
+    }
+  }
+
+  @Test
+  public void do_not_display_submitter_param_in_log_when_submitterLogin_is_not_set_in_case_of_error() throws Exception {
+    CeTask ceTask = createCeTask(null);
+    when(queue.peek(anyString())).thenReturn(Optional.of(ceTask));
+    taskProcessorRepository.setProcessorForTask(ceTask.getType(), taskProcessor);
+    makeTaskProcessorFail(ceTask);
+
+    underTest.call();
+
+    verifyWorkerUuid();
+    List<String> logs = logTester.logs(LoggerLevel.INFO);
+    assertThat(logs).hasSize(1);
+    assertThat(logs.get(0)).doesNotContain(" | submitter=");
+    logs = logTester.logs(LoggerLevel.ERROR);
+    assertThat(logs).hasSize(2);
+    for (int i = 0; i < 2; i++) {
+      assertThat(logs.get(i)).doesNotContain(" | submitter=");
+    }
+    assertThat(logTester.logs(LoggerLevel.DEBUG)).isEmpty();
+  }
+
+  @Test
+  public void display_submitterLogin_in_logs_when_set_in_case_of_success() throws Exception {
+    when(queue.peek(anyString())).thenReturn(Optional.of(createCeTask("FooBar")));
+    taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
+
+    underTest.call();
+
+    verifyWorkerUuid();
+    List<String> logs = logTester.logs(LoggerLevel.INFO);
+    assertThat(logs).hasSize(2);
+    assertThat(logs.get(0)).contains(" | submitter=FooBar");
+    assertThat(logs.get(1)).contains(" | submitter=FooBar | time=");
+    assertThat(logTester.logs(LoggerLevel.ERROR)).isEmpty();
+    assertThat(logTester.logs(LoggerLevel.DEBUG)).isEmpty();
+  }
+
+  @Test
+  public void display_submitterLogin_in_logs_when_set_in_case_of_error() throws Exception {
+    CeTask ceTask = createCeTask("FooBar");
+    when(queue.peek(anyString())).thenReturn(Optional.of(ceTask));
+    taskProcessorRepository.setProcessorForTask(ceTask.getType(), taskProcessor);
+    makeTaskProcessorFail(ceTask);
+
+    underTest.call();
+
+    verifyWorkerUuid();
+    List<String> logs = logTester.logs(LoggerLevel.INFO);
+    assertThat(logs).hasSize(1);
+    assertThat(logs.iterator().next()).contains(" | submitter=FooBar");
+    logs = logTester.logs(LoggerLevel.ERROR);
+    assertThat(logs).hasSize(2);
+    assertThat(logs.get(0)).isEqualTo("Failed to execute task " + ceTask.getUuid());
+    assertThat(logs.get(1)).contains(" | submitter=FooBar | time=");
+  }
+
+  @Test
+  public void display_start_stop_at_debug_level_for_console_if_DEBUG_is_enabled_and_task_successful() throws Exception {
+    logTester.setLevel(LoggerLevel.DEBUG);
+
+    when(queue.peek(anyString())).thenReturn(Optional.of(createCeTask("FooBar")));
+    taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
+
+    underTest.call();
+
+    verifyWorkerUuid();
+    List<String> logs = logTester.logs(LoggerLevel.INFO);
+    assertThat(logs).hasSize(2);
+    assertThat(logs.get(0)).contains(" | submitter=FooBar");
+    assertThat(logs.get(1)).contains(" | submitter=FooBar | time=");
+    assertThat(logTester.logs(LoggerLevel.ERROR)).isEmpty();
+    assertThat(logTester.logs(LoggerLevel.DEBUG)).isEmpty();
+  }
+
+  @Test
+  public void display_start_at_debug_level_stop_at_error_level_for_console_if_DEBUG_is_enabled_and_task_failed() throws Exception {
+    logTester.setLevel(LoggerLevel.DEBUG);
+
+    CeTask ceTask = createCeTask("FooBar");
+    when(queue.peek(anyString())).thenReturn(Optional.of(ceTask));
+    taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
+    makeTaskProcessorFail(ceTask);
+
+    underTest.call();
+
+    verifyWorkerUuid();
+    List<String> logs = logTester.logs(LoggerLevel.INFO);
+    assertThat(logs).hasSize(1);
+    assertThat(logs.iterator().next()).contains(" | submitter=FooBar");
+    logs = logTester.logs(LoggerLevel.ERROR);
+    assertThat(logs).hasSize(2);
+    assertThat(logs.get(0)).isEqualTo("Failed to execute task " + ceTask.getUuid());
+    assertThat(logs.get(1)).contains(" | submitter=FooBar | time=");
+    assertThat(logTester.logs(LoggerLevel.DEBUG)).isEmpty();
+  }
+
+  private void verifyWorkerUuid() {
+    verify(queue).peek(workerUuid.capture());
+    assertThat(workerUuid.getValue()).startsWith(workerUuid.getValue());
+  }
+
+  private static CeTask createCeTask(@Nullable String submitterLogin) {
+    return new CeTask.Builder()
+      .setOrganizationUuid("org1")
+      .setUuid("TASK_1").setType(CeTaskTypes.REPORT)
+      .setComponentUuid("PROJECT_1")
+      .setSubmitterLogin(submitterLogin)
+      .build();
+  }
+
+  private IllegalStateException makeTaskProcessorFail(CeTask task) {
+    IllegalStateException error = new IllegalStateException("simulate exception thrown by TaskProcessor#process");
+    doThrow(error).when(taskProcessor).process(task);
+    return error;
+  }
+}