]> source.dussan.org Git - sonarqube.git/commitdiff
SONAR-12043 Refactor CeServer
authorDuarte Meneses <duarte.meneses@sonarsource.com>
Fri, 10 May 2019 18:48:20 +0000 (13:48 -0500)
committerSonarTech <sonartech@sonarsource.com>
Mon, 3 Jun 2019 18:21:22 +0000 (20:21 +0200)
* Improves waiting operations: no pooling, no additional threads
* Any number of threads can wait for stop (actually required if both stop and hard stop threads end up waiting)
* Prevents potentially multiple invocations of 'stopProcessing' by 'stop()' and the 'CeMainThread' after a hard stop

server/sonar-ce/src/main/java/org/sonar/ce/app/CeServer.java
server/sonar-ce/src/test/java/org/sonar/ce/app/CeServerTest.java
server/sonar-process/src/main/java/org/sonar/process/ProcessUtils.java [deleted file]
server/sonar-process/src/test/java/org/sonar/process/ProcessUtilsTest.java [deleted file]

index b1d33169eac6d85ac4afeb567c55c345eaf22e6d..f8f6b9faa84f34bc7da67195dd88832040d35c15 100644 (file)
@@ -21,7 +21,7 @@ package org.sonar.ce.app;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.CountDownLatch;
 import javax.annotation.Nullable;
 import org.sonar.api.utils.log.Logger;
 import org.sonar.api.utils.log.Loggers;
@@ -35,7 +35,6 @@ import org.sonar.process.ProcessEntryPoint;
 import org.sonar.process.Props;
 
 import static com.google.common.base.Preconditions.checkState;
-import static org.sonar.process.ProcessUtils.awaitTermination;
 
 /**
  * The Compute Engine server which starts a daemon thread to run the {@link ComputeEngineImpl} when it's {@link #start()}
@@ -49,11 +48,7 @@ public class CeServer implements Monitored {
 
   private static final String CE_MAIN_THREAD_NAME = "ce-main";
 
-  /**
-   * Thread that currently is inside our await() method.
-   */
-  private AtomicReference<Thread> awaitThread = new AtomicReference<>();
-  private volatile boolean stopAwait = false;
+  private CountDownLatch awaitStop = new CountDownLatch(1);
 
   private final ComputeEngine computeEngine;
   @Nullable
@@ -87,35 +82,30 @@ public class CeServer implements Monitored {
 
   @Override
   public void awaitStop() {
-    checkState(awaitThread.compareAndSet(null, Thread.currentThread()), "There can't be more than one thread waiting for the Compute Engine to stop");
     checkState(ceMainThread != null, "awaitStop() must not be called before start()");
-
-    try {
-      while (!stopAwait) {
-        try {
-          // wait for a quite long time but we will be interrupted if flag changes anyway
-          Thread.sleep(10_000);
-        } catch (InterruptedException e) {
-          // continue and check the flag
-        }
+    while (true) {
+      try {
+        awaitStop.await();
+        return;
+      } catch (InterruptedException e) {
+        // abort waiting
       }
-    } finally {
-      awaitThread = null;
     }
   }
 
   @Override
   public void stop() {
-    computeEngine.stopProcessing();
-    hardStop();
+    if (ceMainThread != null) {
+      ceMainThread.stopIt();
+      awaitStop();
+    }
   }
 
   @Override
   public void hardStop() {
     if (ceMainThread != null) {
-      // signal main Thread to stop
-      ceMainThread.stopIt();
-      awaitTermination(ceMainThread);
+      ceMainThread.stopItNow();
+      awaitStop();
     }
   }
 
@@ -133,10 +123,11 @@ public class CeServer implements Monitored {
   }
 
   private class CeMainThread extends Thread {
-    private static final int CHECK_FOR_STOP_DELAY = 50;
-    private volatile boolean stop = false;
+    private final CountDownLatch stopSignal = new CountDownLatch(1);
     private volatile boolean started = false;
     private volatile boolean operational = false;
+    private volatile boolean hardStop = false;
+    private volatile boolean dontInterrupt = false;
 
     public CeMainThread() {
       super(CE_MAIN_THREAD_NAME);
@@ -147,17 +138,27 @@ public class CeServer implements Monitored {
       boolean startupSuccessful = attemptStartup();
       this.operational = startupSuccessful;
       this.started = true;
-      if (startupSuccessful) {
-        // call below is blocking
-        waitForStopSignal();
-      } else {
-        stopAwait();
+      try {
+        if (startupSuccessful) {
+          try {
+            stopSignal.await();
+          } catch (InterruptedException e) {
+            // don't restore interrupt flag since it would be unset in attemptShutdown anyway
+          }
+
+          attemptShutdown();
+        }
+      } finally {
+        // release thread(s) waiting for CeServer to stop
+        signalAwaitStop();
       }
     }
 
     private boolean attemptStartup() {
       try {
-        startup();
+        LOG.info("Compute Engine starting up...");
+        computeEngine.startup();
+        LOG.info("Compute Engine is operational");
         return true;
       } catch (org.sonar.api.utils.MessageException | org.sonar.process.MessageException e) {
         LOG.error("Compute Engine startup failed: " + e.getMessage());
@@ -168,35 +169,19 @@ public class CeServer implements Monitored {
       }
     }
 
-    private void startup() {
-      LOG.info("Compute Engine starting up...");
-      computeEngine.startup();
-      LOG.info("Compute Engine is operational");
-    }
-
-    private void waitForStopSignal() {
-      while (!stop) {
-        try {
-          Thread.sleep(CHECK_FOR_STOP_DELAY);
-        } catch (InterruptedException e) {
-          // ignore the interruption itself
-          // Do not propagate the isInterrupted flag with Thread.currentThread().interrupt()
-          // It will break the shutdown of ComputeEngineContainerImpl#stop()
-        }
-      }
-      attemptShutdown();
-    }
-
     private void attemptShutdown() {
       try {
         LOG.info("Compute Engine is stopping...");
+        if (!hardStop) {
+          computeEngine.stopProcessing();
+        }
+        dontInterrupt = true;
+        // make sure that interrupt flag is unset because we don't want to interrupt shutdown of pico container
+        interrupted();
         computeEngine.shutdown();
         LOG.info("Compute Engine is stopped");
       } catch (Throwable e) {
         LOG.error("Compute Engine failed to stop", e);
-      } finally {
-        // release thread waiting for CeServer
-        stopAwait();
       }
     }
 
@@ -209,26 +194,21 @@ public class CeServer implements Monitored {
     }
 
     public void stopIt() {
-      // stop looping indefinitely
-      this.stop = true;
-      // interrupt current thread in case its waiting for WebServer
-      // TODO is the waiting during startup or shutdown? this will most likely cause the shutdown to always fail to finish cleanly
-      interrupt();
+      stopSignal.countDown();
     }
 
-    private void stopAwait() {
-      stopAwait = true;
-      Thread t = awaitThread.get();
-      if (t != null) {
-        t.interrupt();
-        try {
-          t.join(1_000);
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          // Ignored
-        }
+    public void stopItNow() {
+      hardStop = true;
+      stopSignal.countDown();
+      // interrupt current thread unless it's already performing shutdown
+      if (!dontInterrupt) {
+        interrupt();
       }
     }
+
+    private void signalAwaitStop() {
+      awaitStop.countDown();
+    }
   }
 
 }
index ca451a6733ec465f0afa3247219fccd705cc8e06..4811501068f2d578a1225803875d2d78082c23ca 100644 (file)
@@ -19,7 +19,6 @@
  */
 package org.sonar.ce.app;
 
-import com.google.common.base.MoreObjects;
 import java.util.concurrent.CountDownLatch;
 import javax.annotation.CheckForNull;
 import javax.annotation.Nullable;
@@ -71,6 +70,16 @@ public class CeServerTest {
     assertThat(Thread.activeCount()).isSameAs(activeCount);
   }
 
+  @Test
+  public void awaitStop_throws_ISE_if_called_before_start() {
+    CeServer ceServer = newCeServer();
+
+    expectedException.expect(IllegalStateException.class);
+    expectedException.expectMessage("awaitStop() must not be called before start()");
+
+    ceServer.awaitStop();
+  }
+
   @Test
   public void start_starts_a_new_Thread() {
     int activeCount = Thread.activeCount();
@@ -140,40 +149,6 @@ public class CeServerTest {
     assertThat(ceServer.getStatus()).isEqualTo(Monitored.Status.OPERATIONAL);
   }
 
-  @Test
-  public void awaitStop_throws_ISE_if_called_before_start() {
-    CeServer ceServer = newCeServer();
-
-    expectedException.expect(IllegalStateException.class);
-    expectedException.expectMessage("awaitStop() must not be called before start()");
-
-    ceServer.awaitStop();
-  }
-
-  @Test
-  public void awaitStop_throws_ISE_if_called_twice() {
-    final CeServer ceServer = newCeServer();
-    ExceptionCatcherWaitingThread waitingThread1 = new ExceptionCatcherWaitingThread(ceServer);
-    ExceptionCatcherWaitingThread waitingThread2 = new ExceptionCatcherWaitingThread(ceServer);
-
-    ceServer.start();
-
-    waitingThread1.start();
-    waitingThread2.start();
-
-    while (waitingThread1.isAlive() && waitingThread2.isAlive()) {
-      // wait for either thread to stop because ceServer.awaitStop() failed with an exception
-      // if none stops, the test will fail with timeout
-    }
-
-    Exception exception = MoreObjects.firstNonNull(waitingThread1.getException(), waitingThread2.getException());
-    assertThat(exception)
-      .isInstanceOf(IllegalStateException.class)
-      .hasMessage("There can't be more than one thread waiting for the Compute Engine to stop");
-
-    assertThat(waitingThread1.getException() != null && waitingThread2.getException() != null).isFalse();
-  }
-
   @Test
   public void awaitStop_keeps_blocking_calling_thread_even_if_calling_thread_is_interrupted_but_until_stop_is_called() throws Exception {
     final CeServer ceServer = newCeServer();
@@ -305,30 +280,6 @@ public class CeServerTest {
     }
   }
 
-  private static class ExceptionCatcherWaitingThread extends Thread {
-    private final CeServer ceServer;
-    @CheckForNull
-    private Exception exception = null;
-
-    public ExceptionCatcherWaitingThread(CeServer ceServer) {
-      this.ceServer = ceServer;
-    }
-
-    @Override
-    public void run() {
-      try {
-        ceServer.awaitStop();
-      } catch (Exception e) {
-        this.exception = e;
-      }
-    }
-
-    @CheckForNull
-    public Exception getException() {
-      return exception;
-    }
-  }
-
   private enum DoNothingComputeEngine implements ComputeEngine {
     INSTANCE;
 
diff --git a/server/sonar-process/src/main/java/org/sonar/process/ProcessUtils.java b/server/sonar-process/src/main/java/org/sonar/process/ProcessUtils.java
deleted file mode 100644 (file)
index 9b407d9..0000000
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * SonarQube
- * Copyright (C) 2009-2019 SonarSource SA
- * mailto:info AT sonarsource DOT com
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
- */
-package org.sonar.process;
-
-import javax.annotation.Nullable;
-
-public class ProcessUtils {
-
-  private ProcessUtils() {
-    // only static stuff
-  }
-
-  public static void awaitTermination(@Nullable Thread t) {
-    if (t == null || Thread.currentThread() == t) {
-      return;
-    }
-
-    while (t.isAlive()) {
-      try {
-        t.join();
-      } catch (InterruptedException e) {
-        // ignore, keep on waiting for t to stop
-        Thread.currentThread().interrupt();
-      }
-    }
-  }
-}
diff --git a/server/sonar-process/src/test/java/org/sonar/process/ProcessUtilsTest.java b/server/sonar-process/src/test/java/org/sonar/process/ProcessUtilsTest.java
deleted file mode 100644 (file)
index befedf8..0000000
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * SonarQube
- * Copyright (C) 2009-2019 SonarSource SA
- * mailto:info AT sonarsource DOT com
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
- */
-package org.sonar.process;
-
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.DisableOnDebug;
-import org.junit.rules.TestRule;
-import org.junit.rules.Timeout;
-import org.sonar.test.TestUtils;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.sonar.process.ProcessUtils.awaitTermination;
-
-public class ProcessUtilsTest {
-
-  @Rule
-  public TestRule safeguardTimeout = new DisableOnDebug(Timeout.seconds(60));
-
-  @Test
-  public void private_constructor() {
-    assertThat(TestUtils.hasOnlyPrivateConstructors(ProcessUtils.class)).isTrue();
-  }
-
-  @Test
-  public void awaitTermination_does_not_fail_on_null_Thread_argument() {
-    awaitTermination(null);
-  }
-
-  @Test
-  public void awaitTermination_does_not_wait_on_currentThread() {
-    awaitTermination(Thread.currentThread());
-  }
-
-  @Test
-  public void awaitTermination_ignores_interrupted_exception_of_current_thread() throws InterruptedException {
-    final EverRunningThread runningThread = new EverRunningThread();
-    final Thread safeJoiner = new Thread(() -> awaitTermination(runningThread));
-    final Thread simpleJoiner = new Thread(() -> {
-      try {
-        runningThread.join();
-      } catch (InterruptedException e) {
-        System.err.println("runningThread interruption detected in SimpleJoiner");
-      }
-    });
-    runningThread.start();
-    safeJoiner.start();
-    simpleJoiner.start();
-
-    // interrupt safeJoiner _before simpleJoiner to work around some arbitrary sleep delay_ which should not stop watching
-    safeJoiner.interrupt();
-
-    // interrupting simpleJoiner which should stop
-    simpleJoiner.interrupt();
-
-    while (simpleJoiner.isAlive()) {
-      // wait for simpleJoiner to stop
-    }
-
-    // safeJoiner must still be alive
-    assertThat(safeJoiner.isAlive()).isTrue();
-
-    // stop runningThread
-    runningThread.stopIt();
-
-    while (runningThread.isAlive()) {
-      // wait for runningThread to stop
-    }
-
-    // wait for safeJoiner to stop because runningThread has stopped, if it doesn't, the test will fail with a timeout
-    safeJoiner.join();
-  }
-
-  private static class EverRunningThread extends Thread {
-    private volatile boolean stop = false;
-
-    @Override
-    public void run() {
-      while (!stop) {
-        // infinite loop!
-      }
-    }
-
-    public void stopIt() {
-      this.stop = true;
-    }
-  }
-
-}