]> source.dussan.org Git - sonarqube.git/commitdiff
SONAR-7088 faster consumption of pending tasks in CE 667/head
authorSébastien Lesaint <sebastien.lesaint@sonarsource.com>
Tue, 1 Dec 2015 16:20:09 +0000 (17:20 +0100)
committerSébastien Lesaint <sebastien.lesaint@sonarsource.com>
Wed, 2 Dec 2015 17:13:05 +0000 (18:13 +0100)
server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeProcessingSchedulerExecutorService.java
server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeProcessingSchedulerExecutorServiceImpl.java
server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeProcessingSchedulerImpl.java
server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeWorkerRunnable.java
server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeWorkerRunnableImpl.java
server/sonar-server/src/test/java/org/sonar/server/computation/taskprocessor/CeProcessingSchedulerImplTest.java
server/sonar-server/src/test/java/org/sonar/server/computation/taskprocessor/CeWorkerRunnableImplTest.java

index 3fc101ed7ab043dbe70a418f6d2e728189f520de..33d02bf48aa9f0412f7be6a67e13e26e5cfe0a33 100644 (file)
  */
 package org.sonar.server.computation.taskprocessor;
 
-import org.sonar.server.util.StoppableScheduledExecutorService;
+import com.google.common.util.concurrent.ListeningScheduledExecutorService;
+import org.sonar.server.util.StoppableExecutorService;
 
 /**
  * The {@link java.util.concurrent.ExecutorService} responsible for running {@link CeWorkerRunnableImpl}.
  */
-public interface CeProcessingSchedulerExecutorService extends StoppableScheduledExecutorService {
+public interface CeProcessingSchedulerExecutorService extends StoppableExecutorService, ListeningScheduledExecutorService {
 }
index 8ea574664390141efa44d880ec54e54372eb60f9..3970c17d4602c9321e7242b8f1d020e64d324371 100644 (file)
  */
 package org.sonar.server.computation.taskprocessor;
 
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListenableScheduledFuture;
+import com.google.common.util.concurrent.ListeningScheduledExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.concurrent.Callable;
 import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import org.sonar.server.util.AbstractStoppableScheduledExecutorServiceImpl;
+import java.util.concurrent.TimeUnit;
+import org.sonar.server.util.AbstractStoppableExecutorService;
 
-public class CeProcessingSchedulerExecutorServiceImpl extends AbstractStoppableScheduledExecutorServiceImpl<ScheduledExecutorService>
+public class CeProcessingSchedulerExecutorServiceImpl extends AbstractStoppableExecutorService<ListeningScheduledExecutorService>
   implements CeProcessingSchedulerExecutorService {
   private static final String THREAD_NAME_PREFIX = "ce-processor-";
 
   public CeProcessingSchedulerExecutorServiceImpl() {
     super(
-      Executors.newSingleThreadScheduledExecutor(
-        new ThreadFactoryBuilder()
-          .setNameFormat(THREAD_NAME_PREFIX + "%d")
-          .setPriority(Thread.MIN_PRIORITY)
-          .build()));
+      MoreExecutors.listeningDecorator(
+        Executors.newSingleThreadScheduledExecutor(
+          new ThreadFactoryBuilder()
+            .setNameFormat(THREAD_NAME_PREFIX + "%d")
+            .setPriority(Thread.MIN_PRIORITY)
+            .build())));
   }
 
+  @Override
+  public <T> ListenableFuture<T> submit(Callable<T> task) {
+    return delegate.submit(task);
+  }
+
+  @Override
+  public <T> ListenableFuture<T> submit(Runnable task, T result) {
+    return delegate.submit(task, result);
+  }
+
+  @Override
+  public ListenableFuture<?> submit(Runnable task) {
+    return delegate.submit(task);
+  }
+
+  @Override
+  public ListenableScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
+    return delegate.schedule(command, delay, unit);
+  }
+
+  @Override
+  public <V> ListenableScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
+    return delegate.schedule(callable, delay, unit);
+  }
+
+  @Override
+  public ListenableScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
+    return delegate.scheduleAtFixedRate(command, initialDelay, period, unit);
+  }
+
+  @Override
+  public ListenableScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
+    return delegate.scheduleWithFixedDelay(command, initialDelay, delay, unit);
+  }
 }
index 4737c4010dde52147a35166bc02725359b731a71..f7c58020800d046c0c330802fcc574df9dd94d02 100644 (file)
  */
 package org.sonar.server.computation.taskprocessor;
 
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.ListenableScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.sonar.api.utils.log.Logger;
+import org.sonar.api.utils.log.Loggers;
+
+import static com.google.common.util.concurrent.Futures.addCallback;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
 
 public class CeProcessingSchedulerImpl implements CeProcessingScheduler {
+  private static final Logger LOG = Loggers.get(CeProcessingSchedulerImpl.class);
+
   private final CeProcessingSchedulerExecutorService executorService;
   private final CeWorkerRunnable workerRunnable;
 
   private final long delayBetweenTasks;
-  private final long delayForFirstStart;
   private final TimeUnit timeUnit;
 
   public CeProcessingSchedulerImpl(CeProcessingSchedulerExecutorService processingExecutorService, CeWorkerRunnable workerRunnable) {
@@ -34,13 +44,43 @@ public class CeProcessingSchedulerImpl implements CeProcessingScheduler {
     this.workerRunnable = workerRunnable;
 
     this.delayBetweenTasks = 2;
-    this.delayForFirstStart = 0;
-    this.timeUnit = TimeUnit.SECONDS;
+    this.timeUnit = SECONDS;
   }
 
   @Override
   public void startScheduling() {
-    executorService.scheduleAtFixedRate(workerRunnable, delayForFirstStart, delayBetweenTasks, timeUnit);
+    ListenableScheduledFuture<Boolean> future = executorService.schedule(workerRunnable, delayBetweenTasks, timeUnit);
+
+    FutureCallback<Boolean> chainingCallback = new ChainingCallback();
+    addCallback(future, chainingCallback, executorService);
   }
 
+  private class ChainingCallback implements FutureCallback<Boolean> {
+    @Override
+    public void onSuccess(@Nullable Boolean result) {
+      if (result != null && result) {
+        chainWithoutDelay();
+      } else {
+        chainTask(delayBetweenTasks, timeUnit);
+      }
+    }
+
+    @Override
+    public void onFailure(Throwable t) {
+      if (!(t instanceof Error)) {
+        chainWithoutDelay();
+      } else {
+        LOG.error("Compute Engine execution failed. Scheduled processing interrupted.", t);
+      }
+    }
+
+    private void chainWithoutDelay() {
+      chainTask(1, MILLISECONDS);
+    }
+
+    private void chainTask(long delay, TimeUnit unit) {
+      ListenableScheduledFuture<Boolean> future = executorService.schedule(workerRunnable, delay, unit);
+      addCallback(future, this, executorService);
+    }
+  }
 }
index 2df1a66743eea9e428461a4f0561caad2f768320..d6c8fb86b8963648f3c51c61ad5adc3f5f2506f6 100644 (file)
  */
 package org.sonar.server.computation.taskprocessor;
 
+import java.util.concurrent.Callable;
 import org.sonar.server.computation.queue.CeQueue;
 import org.sonar.server.computation.queue.CeTask;
 
 /**
  * Marker interface of the runnable in charge of polling the {@link CeQueue} and executing {@link CeTask}.
  */
-public interface CeWorkerRunnable extends Runnable {
+public interface CeWorkerRunnable extends Callable<Boolean> {
 }
index e20b6ac647e204499c7bb8614621c84773398e5f..a7b28ee9367e8ba84a1a7ed1e952b4d107eb2c3c 100644 (file)
@@ -46,13 +46,14 @@ public class CeWorkerRunnableImpl implements CeWorkerRunnable {
   }
 
   @Override
-  public void run() {
+  public Boolean call() throws Exception {
     Optional<CeTask> ceTask = tryAndFindTaskToExecute();
     if (!ceTask.isPresent()) {
-      return;
+      return false;
     }
 
     executeTask(ceTask.get());
+    return true;
   }
 
   private Optional<CeTask> tryAndFindTaskToExecute() {
index c94bbd4312de35b83a76ec4d31c235681cb1d473..f50ab2c8e48e15eff88e33353b2a979aab90a610 100644 (file)
  */
 package org.sonar.server.computation.taskprocessor;
 
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListenableScheduledFuture;
+import com.google.common.util.concurrent.ListeningScheduledExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.Immutable;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.Timeout;
 
-import static org.mockito.Matchers.eq;
-import static org.mockito.Matchers.same;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
 
 public class CeProcessingSchedulerImplTest {
-  private CeProcessingSchedulerExecutorService processingExecutorService = mock(CeProcessingSchedulerExecutorService.class);
-  private CeWorkerRunnable workerRunnable = mock(CeWorkerRunnable.class);
-  private CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(processingExecutorService, workerRunnable);
+  private static final Error ERROR_TO_INTERRUPT_CHAINING = new Error("Error should stop scheduling");
+
+  @Rule
+  // due to risks of infinite chaining of tasks/futures, a timeout is required for safety
+  public Timeout timeout = Timeout.seconds(1);
+
+  private CeWorkerRunnable ceWorkerRunnable = mock(CeWorkerRunnable.class);
+  private StubCeProcessingSchedulerExecutorService processingExecutorService = new StubCeProcessingSchedulerExecutorService();
+  private ScheduleCall regularDelayedPoll = new ScheduleCall(ceWorkerRunnable, 2L, TimeUnit.SECONDS);
+  private ScheduleCall notDelayedPoll = new ScheduleCall(ceWorkerRunnable, 1L, TimeUnit.MILLISECONDS);
+
+  private CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(processingExecutorService, ceWorkerRunnable);
+
+  @Test
+  public void polls_without_delay_when_CeWorkerRunnable_returns_true() throws Exception {
+    when(ceWorkerRunnable.call())
+      .thenReturn(true)
+      .thenThrow(ERROR_TO_INTERRUPT_CHAINING);
+
+    startSchedulingAndRun();
+
+    assertThat(processingExecutorService.getScheduleCalls()).containsOnly(
+      regularDelayedPoll,
+      notDelayedPoll
+      );
+  }
+
+  @Test
+  public void polls_without_delay_when_CeWorkerRunnable_throws_Exception_but_not_Error() throws Exception {
+    when(ceWorkerRunnable.call())
+      .thenThrow(new Exception("Exception is followed by a poll without delay"))
+      .thenThrow(ERROR_TO_INTERRUPT_CHAINING);
+
+    startSchedulingAndRun();
+
+    assertThat(processingExecutorService.getScheduleCalls()).containsExactly(
+      regularDelayedPoll,
+      notDelayedPoll
+      );
+  }
+
+  @Test
+  public void polls_with_regular_delay_when_CeWorkerRunnable_returns_false() throws Exception {
+    when(ceWorkerRunnable.call())
+      .thenReturn(false)
+      .thenThrow(ERROR_TO_INTERRUPT_CHAINING);
+
+    startSchedulingAndRun();
+
+    assertThat(processingExecutorService.getScheduleCalls()).containsExactly(
+      regularDelayedPoll,
+      regularDelayedPoll
+      );
+  }
 
   @Test
-  public void startScheduling_schedules_CeWorkerRunnable_at_fixed_rate_run_head_of_queue() {
+  public void startScheduling_schedules_CeWorkerRunnable_at_fixed_rate_run_head_of_queue() throws Exception {
+    when(ceWorkerRunnable.call())
+      .thenReturn(true)
+      .thenReturn(true)
+      .thenReturn(false)
+      .thenReturn(true)
+      .thenReturn(false)
+      .thenThrow(new Exception("IAE should not cause scheduling to stop"))
+      .thenReturn(false)
+      .thenReturn(false)
+      .thenReturn(false)
+      .thenThrow(ERROR_TO_INTERRUPT_CHAINING);
+
+    startSchedulingAndRun();
+
+    assertThat(processingExecutorService.getScheduleCalls()).containsExactly(
+      regularDelayedPoll,
+      notDelayedPoll,
+      notDelayedPoll,
+      regularDelayedPoll,
+      notDelayedPoll,
+      regularDelayedPoll,
+      notDelayedPoll,
+      regularDelayedPoll,
+      regularDelayedPoll,
+      regularDelayedPoll
+      );
+  }
+
+  private void startSchedulingAndRun() throws ExecutionException, InterruptedException {
     underTest.startScheduling();
 
-    verify(processingExecutorService).scheduleAtFixedRate(same(workerRunnable), eq(0L), eq(2L), eq(TimeUnit.SECONDS));
-    verifyNoMoreInteractions(processingExecutorService);
+    // execute future synchronously
+    processingExecutorService.runFutures();
+  }
+
+  /**
+   * A synchronous implementation of {@link CeProcessingSchedulerExecutorService} which exposes a synchronous
+   * method to execute futures it creates and exposes a method to retrieve logs of calls to
+   * {@link CeProcessingSchedulerExecutorService#schedule(Callable, long, TimeUnit)} which is used by
+   * {@link CeProcessingSchedulerImpl}.
+   */
+  private static class StubCeProcessingSchedulerExecutorService implements CeProcessingSchedulerExecutorService {
+
+    private final Queue<ScheduledFuture<?>> futures = new ConcurrentLinkedQueue<>();
+    private final ListeningScheduledExecutorService delegate = MoreExecutors.listeningDecorator(new SynchronousStubExecutorService());
+
+    private final List<ScheduleCall> scheduleCalls = new ArrayList<>();
+
+    public List<ScheduleCall> getScheduleCalls() {
+      return scheduleCalls;
+    }
+
+    public void runFutures() throws ExecutionException, InterruptedException {
+      while (futures.peek() != null) {
+        futures.poll().get();
+      }
+    }
+
+    @Override
+    public <V> ListenableScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
+      this.scheduleCalls.add(new ScheduleCall(callable, delay, unit));
+      return delegate.schedule(callable, delay, unit);
+    }
+
+    @Override
+    public void stop() {
+      throw new UnsupportedOperationException("stop() not implemented");
+    }
+
+    // ////////////// delegated methods ////////////////
+
+    @Override
+    public ListenableScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
+      return delegate.schedule(command, delay, unit);
+    }
+
+    @Override
+    public ListenableScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
+      return delegate.scheduleAtFixedRate(command, initialDelay, period, unit);
+    }
+
+    @Override
+    public ListenableScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
+      return delegate.scheduleWithFixedDelay(command, initialDelay, delay, unit);
+    }
+
+    @Override
+    public void shutdown() {
+      delegate.shutdown();
+    }
+
+    @Override
+    public List<Runnable> shutdownNow() {
+      return delegate.shutdownNow();
+    }
+
+    @Override
+    public boolean isShutdown() {
+      return delegate.isShutdown();
+    }
+
+    @Override
+    public boolean isTerminated() {
+      return delegate.isTerminated();
+    }
+
+    @Override
+    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+      return delegate.awaitTermination(timeout, unit);
+    }
+
+    @Override
+    public <T> ListenableFuture<T> submit(Callable<T> task) {
+      return delegate.submit(task);
+    }
+
+    @Override
+    public <T> ListenableFuture<T> submit(Runnable task, T result) {
+      return delegate.submit(task, result);
+    }
+
+    @Override
+    public ListenableFuture<?> submit(Runnable task) {
+      return delegate.submit(task);
+    }
+
+    @Override
+    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
+      return delegate.invokeAll(tasks);
+    }
+
+    @Override
+    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
+      return delegate.invokeAll(tasks, timeout, unit);
+    }
+
+    @Override
+    public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
+      return delegate.invokeAny(tasks);
+    }
+
+    @Override
+    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+      return delegate.invokeAny(tasks, timeout, unit);
+    }
+
+    @Override
+    public void execute(Runnable command) {
+      delegate.execute(command);
+    }
+
+    /**
+     * A partial (only 3 methods) implementation of ScheduledExecutorService which stores futures it creates into
+     * {@link StubCeProcessingSchedulerExecutorService#futures}.
+     */
+    private class SynchronousStubExecutorService implements ScheduledExecutorService {
+      @Override
+      public ScheduledFuture<?> schedule(final Runnable command, long delay, TimeUnit unit) {
+        ScheduledFuture<Void> res = new AbstractPartiallyImplementedScheduledFuture<Void>() {
+          @Override
+          public Void get() throws InterruptedException, ExecutionException {
+            command.run();
+            return null;
+          }
+        };
+        futures.add(res);
+        return res;
+      }
+
+      @Override
+      public <V> ScheduledFuture<V> schedule(final Callable<V> callable, long delay, TimeUnit unit) {
+        ScheduledFuture<V> res = new AbstractPartiallyImplementedScheduledFuture<V>() {
+
+          @Override
+          public V get() throws InterruptedException, ExecutionException {
+            try {
+              return callable.call();
+            } catch (Exception e) {
+              throw new ExecutionException(e);
+            }
+          }
+        };
+        futures.add(res);
+        return res;
+      }
+
+      @Override
+      public void execute(Runnable command) {
+        command.run();
+      }
+
+      // ///////// unsupported operations ///////////
+
+      @Override
+      public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
+        throw new UnsupportedOperationException("scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) not implemented");
+      }
+
+      @Override
+      public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
+        throw new UnsupportedOperationException("scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) not implemented");
+      }
+
+      @Override
+      public void shutdown() {
+        throw new UnsupportedOperationException("shutdown() not implemented");
+      }
+
+      @Override
+      public List<Runnable> shutdownNow() {
+        throw new UnsupportedOperationException("shutdownNow() not implemented");
+      }
+
+      @Override
+      public boolean isShutdown() {
+        throw new UnsupportedOperationException("isShutdown() not implemented");
+      }
+
+      @Override
+      public boolean isTerminated() {
+        throw new UnsupportedOperationException("isTerminated() not implemented");
+      }
+
+      @Override
+      public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+        throw new UnsupportedOperationException("awaitTermination(long timeout, TimeUnit unit) not implemented");
+      }
+
+      @Override
+      public <T> Future<T> submit(Callable<T> task) {
+        throw new UnsupportedOperationException("submit(Callable<T> task) not implemented");
+      }
+
+      @Override
+      public <T> Future<T> submit(Runnable task, T result) {
+        throw new UnsupportedOperationException("submit(Runnable task, T result) not implemented");
+      }
+
+      @Override
+      public Future<?> submit(Runnable task) {
+        throw new UnsupportedOperationException("submit(Runnable task) not implemented");
+      }
+
+      @Override
+      public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
+        throw new UnsupportedOperationException("invokeAll(Collection<? extends Callable<T>> tasks) not implemented");
+      }
+
+      @Override
+      public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
+        throw new UnsupportedOperationException("invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) not implemented");
+      }
+
+      @Override
+      public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
+        throw new UnsupportedOperationException("invokeAny(Collection<? extends Callable<T>> tasks) not implemented");
+      }
+
+      @Override
+      public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+        throw new UnsupportedOperationException("invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) not implemented");
+      }
+    }
+  }
+
+  private static abstract class AbstractPartiallyImplementedScheduledFuture<V> implements ScheduledFuture<V> {
+    @Override
+    public long getDelay(TimeUnit unit) {
+      throw new UnsupportedOperationException("XXX not implemented");
+    }
+
+    @Override
+    public int compareTo(Delayed o) {
+      throw new UnsupportedOperationException("XXX not implemented");
+    }
+
+    @Override
+    public boolean cancel(boolean mayInterruptIfRunning) {
+      throw new UnsupportedOperationException("XXX not implemented");
+    }
+
+    @Override
+    public boolean isCancelled() {
+      throw new UnsupportedOperationException("XXX not implemented");
+    }
+
+    @Override
+    public boolean isDone() {
+      throw new UnsupportedOperationException("XXX not implemented");
+    }
+
+    @Override
+    public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+      throw new UnsupportedOperationException("XXX not implemented");
+    }
+  }
+
+  /**
+   * Used to log parameters of calls to {@link CeProcessingSchedulerExecutorService#schedule(Callable, long, TimeUnit)}
+   */
+  @Immutable
+  private static final class ScheduleCall {
+    private final Callable<?> callable;
+    private final long delay;
+    private final TimeUnit unit;
+
+    private ScheduleCall(Callable<?> callable, long delay, TimeUnit unit) {
+      this.callable = callable;
+      this.delay = delay;
+      this.unit = unit;
+    }
+
+    @Override
+    public boolean equals(@Nullable Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      ScheduleCall that = (ScheduleCall) o;
+      return delay == that.delay && callable == that.callable && unit.equals(that.unit);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(callable, delay, unit);
+    }
+
+    @Override
+    public String toString() {
+      return "ScheduleCall{" +
+        "callable=" + callable +
+        ", delay=" + delay +
+        ", unit=" + unit +
+        '}';
+    }
   }
 
 }
index a95d8365421ed828aafe0221698ad936f51fdb72..305c5e321d48c8b988468d19c25953aa77d3b4b0 100644 (file)
@@ -32,6 +32,7 @@ import org.sonar.server.computation.queue.CeQueueImpl;
 import org.sonar.server.computation.queue.CeTask;
 import org.sonar.server.computation.taskprocessor.report.ReportTaskProcessor;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verifyZeroInteractions;
@@ -52,18 +53,18 @@ public class CeWorkerRunnableImplTest {
   public void no_pending_tasks_in_queue() throws Exception {
     when(queue.peek()).thenReturn(Optional.<CeTask>absent());
 
-    underTest.run();
+    assertThat(underTest.call()).isFalse();
 
     verifyZeroInteractions(taskProcessor, ceLogging);
   }
 
   @Test
-  public void fail_when_no_CeTaskProcessor_is_found_in_repository() {
+  public void fail_when_no_CeTaskProcessor_is_found_in_repository() throws Exception {
     CeTask task = new CeTask.Builder().setUuid("TASK_1").setType(CeTaskTypes.REPORT).setComponentUuid("PROJECT_1").setSubmitterLogin(null).build();
     taskProcessorRepository.setNoProcessorForTask(CeTaskTypes.REPORT);
     when(queue.peek()).thenReturn(Optional.of(task));
 
-    underTest.run();
+    assertThat(underTest.call()).isTrue();
 
     inOrder.verify(ceLogging).initForTask(task);
     inOrder.verify(queue).remove(task, CeActivityDto.Status.FAILED);
@@ -76,7 +77,7 @@ public class CeWorkerRunnableImplTest {
     taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor);
     when(queue.peek()).thenReturn(Optional.of(task));
 
-    underTest.run();
+    assertThat(underTest.call()).isTrue();
 
     inOrder.verify(ceLogging).initForTask(task);
     inOrder.verify(taskProcessor).process(task);
@@ -91,7 +92,7 @@ public class CeWorkerRunnableImplTest {
     taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor);
     doThrow(new IllegalStateException("simulate exception thrown by TaskProcessor#process")).when(taskProcessor).process(task);
 
-    underTest.run();
+    assertThat(underTest.call()).isTrue();
 
     inOrder.verify(ceLogging).initForTask(task);
     inOrder.verify(taskProcessor).process(task);