@@ -25,7 +25,6 @@ import java.util.List; | |||
import java.util.Map; | |||
import java.util.Optional; | |||
import java.util.concurrent.atomic.AtomicBoolean; | |||
import javax.annotation.Nonnull; | |||
import org.sonar.process.NetworkUtilsImpl; | |||
import org.sonar.process.ProcessId; | |||
@@ -36,7 +35,7 @@ public class AppStateImpl implements AppState { | |||
private final AtomicBoolean webLeaderLocked = new AtomicBoolean(false); | |||
@Override | |||
public void addListener(@Nonnull AppStateListener listener) { | |||
public void addListener(AppStateListener listener) { | |||
this.listeners.add(listener); | |||
} | |||
@@ -69,7 +69,7 @@ class NodeLifecycle { | |||
res.put(OPERATIONAL, toSet(STOPPING, STOPPED)); | |||
res.put(STOPPING, toSet(STOPPED)); | |||
res.put(STOPPED, toSet(STARTING)); | |||
return res; | |||
return Collections.unmodifiableMap(res); | |||
} | |||
private static Set<State> toSet(State... states) { |
@@ -51,7 +51,7 @@ public class SchedulerImpl implements Scheduler, ProcessEventListener, ProcessLi | |||
private final AppState appState; | |||
private final NodeLifecycle nodeLifecycle = new NodeLifecycle(); | |||
private final CountDownLatch keepAlive = new CountDownLatch(1); | |||
private final CountDownLatch awaitTermination = new CountDownLatch(1); | |||
private final AtomicBoolean firstWaitingEsLog = new AtomicBoolean(true); | |||
private final AtomicBoolean restartRequested = new AtomicBoolean(false); | |||
private final AtomicBoolean restartDisabled = new AtomicBoolean(false); | |||
@@ -63,8 +63,7 @@ public class SchedulerImpl implements Scheduler, ProcessEventListener, ProcessLi | |||
private long processWatcherDelayMs = SQProcess.DEFAULT_WATCHER_DELAY_MS; | |||
public SchedulerImpl(AppSettings settings, AppReloader appReloader, CommandFactory commandFactory, | |||
ProcessLauncher processLauncher, | |||
AppState appState) { | |||
ProcessLauncher processLauncher, AppState appState) { | |||
this.settings = settings; | |||
this.appReloader = appReloader; | |||
this.commandFactory = commandFactory; | |||
@@ -203,13 +202,13 @@ public class SchedulerImpl implements Scheduler, ProcessEventListener, ProcessLi | |||
if (restarterThread != null) { | |||
restarterThread.interrupt(); | |||
} | |||
keepAlive.countDown(); | |||
awaitTermination.countDown(); | |||
} | |||
@Override | |||
public void awaitTermination() { | |||
try { | |||
keepAlive.await(); | |||
awaitTermination.await(); | |||
} catch (InterruptedException e) { | |||
Thread.currentThread().interrupt(); | |||
} |
@@ -34,7 +34,6 @@ import java.util.EnumMap; | |||
import java.util.List; | |||
import java.util.Map; | |||
import java.util.Optional; | |||
import java.util.concurrent.locks.Lock; | |||
import org.elasticsearch.cluster.health.ClusterHealthStatus; | |||
import org.slf4j.Logger; | |||
import org.slf4j.LoggerFactory; | |||
@@ -122,21 +121,7 @@ public class ClusterAppStateImpl implements ClusterAppState { | |||
@Override | |||
public boolean tryToLockWebLeader() { | |||
IAtomicReference<String> leader = hzMember.getAtomicReference(LEADER); | |||
if (leader.get() == null) { | |||
Lock lock = hzMember.getLock(LEADER); | |||
lock.lock(); | |||
try { | |||
if (leader.get() == null) { | |||
leader.set(hzMember.getUuid()); | |||
return true; | |||
} | |||
return false; | |||
} finally { | |||
lock.unlock(); | |||
} | |||
} else { | |||
return false; | |||
} | |||
return leader.compareAndSet(null, hzMember.getUuid()); | |||
} | |||
@Override | |||
@@ -147,44 +132,28 @@ public class ClusterAppStateImpl implements ClusterAppState { | |||
@Override | |||
public void registerSonarQubeVersion(String sonarqubeVersion) { | |||
IAtomicReference<String> sqVersion = hzMember.getAtomicReference(SONARQUBE_VERSION); | |||
if (sqVersion.get() == null) { | |||
Lock lock = hzMember.getLock(SONARQUBE_VERSION); | |||
lock.lock(); | |||
try { | |||
if (sqVersion.get() == null) { | |||
sqVersion.set(sonarqubeVersion); | |||
} | |||
} finally { | |||
lock.unlock(); | |||
} | |||
} | |||
boolean wasSet = sqVersion.compareAndSet(null, sonarqubeVersion); | |||
String clusterVersion = sqVersion.get(); | |||
if (!sqVersion.get().equals(sonarqubeVersion)) { | |||
throw new IllegalStateException( | |||
format("The local version %s is not the same as the cluster %s", sonarqubeVersion, clusterVersion)); | |||
if (!wasSet) { | |||
String clusterVersion = sqVersion.get(); | |||
if (!sqVersion.get().equals(sonarqubeVersion)) { | |||
throw new IllegalStateException( | |||
format("The local version %s is not the same as the cluster %s", sonarqubeVersion, clusterVersion)); | |||
} | |||
} | |||
} | |||
@Override | |||
public void registerClusterName(String clusterName) { | |||
IAtomicReference<String> property = hzMember.getAtomicReference(CLUSTER_NAME); | |||
if (property.get() == null) { | |||
Lock lock = hzMember.getLock(CLUSTER_NAME); | |||
lock.lock(); | |||
try { | |||
if (property.get() == null) { | |||
property.set(clusterName); | |||
} | |||
} finally { | |||
lock.unlock(); | |||
} | |||
} | |||
boolean wasSet = property.compareAndSet(null, clusterName); | |||
String clusterValue = property.get(); | |||
if (!property.get().equals(clusterName)) { | |||
throw new MessageException( | |||
format("This node has a cluster name [%s], which does not match [%s] from the cluster", clusterName, clusterValue)); | |||
if (!wasSet) { | |||
String clusterValue = property.get(); | |||
if (!property.get().equals(clusterName)) { | |||
throw new MessageException( | |||
format("This node has a cluster name [%s], which does not match [%s] from the cluster", clusterName, clusterValue)); | |||
} | |||
} | |||
} | |||
@@ -20,6 +20,7 @@ | |||
package org.sonar.application.cluster; | |||
import java.io.Serializable; | |||
import java.util.Objects; | |||
import org.sonar.process.ProcessId; | |||
import static java.util.Objects.requireNonNull; | |||
@@ -58,8 +59,6 @@ public class ClusterProcess implements Serializable { | |||
@Override | |||
public int hashCode() { | |||
int result = processId.hashCode(); | |||
result = 31 * result + nodeUuid.hashCode(); | |||
return result; | |||
return Objects.hash(processId, nodeUuid); | |||
} | |||
} |
@@ -66,7 +66,7 @@ public class Lifecycle { | |||
res.put(STARTED, toSet(STOPPING, STOPPED)); | |||
res.put(STOPPING, toSet(STOPPED)); | |||
res.put(STOPPED, toSet()); | |||
return res; | |||
return Collections.unmodifiableMap(res); | |||
} | |||
private static Set<State> toSet(State... states) { |
@@ -20,7 +20,6 @@ | |||
package org.sonar.application.process; | |||
import org.sonar.application.FileSystem; | |||
import org.sonar.application.Scheduler; | |||
import org.sonar.application.config.AppSettings; | |||
import org.sonar.process.ProcessId; | |||
import org.sonar.process.sharedmemoryfile.DefaultProcessCommands; | |||
@@ -33,24 +32,24 @@ public class StopRequestWatcherImpl extends Thread implements StopRequestWatcher | |||
private static final long DEFAULT_WATCHER_DELAY_MS = 500L; | |||
private final ProcessCommands commands; | |||
private final Scheduler scheduler; | |||
private final Runnable listener; | |||
private final AppSettings settings; | |||
private long delayMs = DEFAULT_WATCHER_DELAY_MS; | |||
StopRequestWatcherImpl(AppSettings settings, Scheduler scheduler, ProcessCommands commands) { | |||
StopRequestWatcherImpl(AppSettings settings, Runnable listener, ProcessCommands commands) { | |||
super("StopRequestWatcherImpl"); | |||
this.settings = settings; | |||
this.commands = commands; | |||
this.scheduler = scheduler; | |||
this.listener = listener; | |||
// safeguard, do not block the JVM if thread is not interrupted | |||
// (method stopWatching() never called). | |||
setDaemon(true); | |||
} | |||
public static StopRequestWatcherImpl create(AppSettings settings, Scheduler scheduler, FileSystem fs) { | |||
public static StopRequestWatcherImpl create(AppSettings settings, Runnable listener, FileSystem fs) { | |||
DefaultProcessCommands commands = DefaultProcessCommands.secondary(fs.getTempDir(), ProcessId.APP.getIpcIndex()); | |||
return new StopRequestWatcherImpl(settings, scheduler, commands); | |||
return new StopRequestWatcherImpl(settings, listener, commands); | |||
} | |||
long getDelayMs() { | |||
@@ -66,7 +65,7 @@ public class StopRequestWatcherImpl extends Thread implements StopRequestWatcher | |||
try { | |||
while (true) { | |||
if (commands.askedForStop()) { | |||
scheduler.terminate(); | |||
listener.run(); | |||
return; | |||
} | |||
Thread.sleep(delayMs); | |||
@@ -86,7 +85,7 @@ public class StopRequestWatcherImpl extends Thread implements StopRequestWatcher | |||
@Override | |||
public void stopWatching() { | |||
// does nothing is not started | |||
// does nothing if not started | |||
interrupt(); | |||
} | |||
} |
@@ -27,7 +27,6 @@ import org.junit.rules.TemporaryFolder; | |||
import org.junit.rules.TestRule; | |||
import org.junit.rules.Timeout; | |||
import org.sonar.application.FileSystem; | |||
import org.sonar.application.Scheduler; | |||
import org.sonar.application.config.AppSettings; | |||
import org.sonar.process.sharedmemoryfile.ProcessCommands; | |||
@@ -50,32 +49,32 @@ public class StopRequestWatcherImplTest { | |||
private AppSettings settings = mock(AppSettings.class, RETURNS_DEEP_STUBS); | |||
private ProcessCommands commands = mock(ProcessCommands.class); | |||
private Scheduler scheduler = mock(Scheduler.class); | |||
private Runnable listener = mock(Runnable.class); | |||
@Test | |||
public void do_not_watch_command_if_disabled() { | |||
enableSetting(false); | |||
StopRequestWatcherImpl underTest = new StopRequestWatcherImpl(settings, scheduler, commands); | |||
StopRequestWatcherImpl underTest = new StopRequestWatcherImpl(settings, listener, commands); | |||
underTest.startWatching(); | |||
assertThat(underTest.isAlive()).isFalse(); | |||
underTest.stopWatching(); | |||
verifyZeroInteractions(commands, scheduler); | |||
verifyZeroInteractions(commands, listener); | |||
} | |||
@Test | |||
public void watch_stop_command_if_enabled() throws Exception { | |||
enableSetting(true); | |||
StopRequestWatcherImpl underTest = new StopRequestWatcherImpl(settings, scheduler, commands); | |||
StopRequestWatcherImpl underTest = new StopRequestWatcherImpl(settings, listener, commands); | |||
underTest.setDelayMs(1L); | |||
underTest.startWatching(); | |||
assertThat(underTest.isAlive()).isTrue(); | |||
verify(scheduler, never()).terminate(); | |||
verify(listener, never()).run(); | |||
when(commands.askedForStop()).thenReturn(true); | |||
verify(scheduler, timeout(1_000L)).terminate(); | |||
verify(listener, timeout(1_000L)).run(); | |||
underTest.stopWatching(); | |||
while (underTest.isAlive()) { | |||
@@ -88,7 +87,7 @@ public class StopRequestWatcherImplTest { | |||
FileSystem fs = mock(FileSystem.class); | |||
when(fs.getTempDir()).thenReturn(temp.newFolder()); | |||
StopRequestWatcherImpl underTest = StopRequestWatcherImpl.create(settings, scheduler, fs); | |||
StopRequestWatcherImpl underTest = StopRequestWatcherImpl.create(settings, listener, fs); | |||
assertThat(underTest.getDelayMs()).isEqualTo(500L); | |||
} | |||
@@ -96,7 +95,7 @@ public class StopRequestWatcherImplTest { | |||
@Test | |||
public void stop_watching_commands_if_thread_is_interrupted() throws Exception { | |||
enableSetting(true); | |||
StopRequestWatcherImpl underTest = new StopRequestWatcherImpl(settings, scheduler, commands); | |||
StopRequestWatcherImpl underTest = new StopRequestWatcherImpl(settings, listener, commands); | |||
underTest.startWatching(); | |||
underTest.interrupt(); |
@@ -19,31 +19,29 @@ | |||
*/ | |||
package org.sonar.process; | |||
import java.util.concurrent.atomic.AtomicBoolean; | |||
/** | |||
* Calls {@link System#exit(int)} except from shutdown hooks, to prevent | |||
* deadlocks. See http://stackoverflow.com/a/19552359/229031 | |||
*/ | |||
public class SystemExit { | |||
private final AtomicBoolean inShutdownHook = new AtomicBoolean(false); | |||
private volatile boolean inShutdownHook = false; | |||
public void exit(int code) { | |||
if (!inShutdownHook.get()) { | |||
if (!inShutdownHook) { | |||
doExit(code); | |||
} | |||
} | |||
public boolean isInShutdownHook() { | |||
return inShutdownHook.get(); | |||
return inShutdownHook; | |||
} | |||
/** | |||
* Declarative approach. I don't know how to get this lifecycle state from Java API. | |||
*/ | |||
public void setInShutdownHook() { | |||
inShutdownHook.set(true); | |||
inShutdownHook = true; | |||
} | |||
void doExit(int code) { |
@@ -38,14 +38,14 @@ public class DefaultProcessCommands implements ProcessCommands { | |||
/** | |||
* Main DefaultProcessCommands will clear the shared memory space of the specified process number when created and will | |||
* then write and/or read to it. | |||
* Therefor there should be only one main DefaultProcessCommands. | |||
* Therefore there should be only one main DefaultProcessCommands. | |||
*/ | |||
public static DefaultProcessCommands main(File directory, int processNumber) { | |||
return new DefaultProcessCommands(directory, processNumber, true); | |||
} | |||
/** | |||
* Secondary DefaultProcessCommands will read and write to the shared memory space but will not clear it. Therefor, there | |||
* Secondary DefaultProcessCommands will read and write to the shared memory space but will not clear it. Therefore, there | |||
* can be any number of them. | |||
*/ | |||
public static DefaultProcessCommands secondary(File directory, int processNumber) { |
@@ -74,7 +74,7 @@ public class App { | |||
scheduler.schedule(); | |||
stopRequestWatcher = StopRequestWatcherImpl.create(settings, scheduler, fileSystem); | |||
stopRequestWatcher = StopRequestWatcherImpl.create(settings, scheduler::terminate, fileSystem); | |||
stopRequestWatcher.startWatching(); | |||
scheduler.awaitTermination(); |