You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

SchedulerImplTest.java 16KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474
  1. /*
  2. * SonarQube
  3. * Copyright (C) 2009-2019 SonarSource SA
  4. * mailto:info AT sonarsource DOT com
  5. *
  6. * This program is free software; you can redistribute it and/or
  7. * modify it under the terms of the GNU Lesser General Public
  8. * License as published by the Free Software Foundation; either
  9. * version 3 of the License, or (at your option) any later version.
  10. *
  11. * This program is distributed in the hope that it will be useful,
  12. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  14. * Lesser General Public License for more details.
  15. *
  16. * You should have received a copy of the GNU Lesser General Public License
  17. * along with this program; if not, write to the Free Software Foundation,
  18. * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
  19. */
  20. package org.sonar.application;
  21. import java.io.File;
  22. import java.io.InputStream;
  23. import java.util.ArrayList;
  24. import java.util.EnumMap;
  25. import java.util.List;
  26. import java.util.Random;
  27. import java.util.concurrent.CountDownLatch;
  28. import java.util.concurrent.TimeUnit;
  29. import org.junit.After;
  30. import org.junit.Before;
  31. import org.junit.Rule;
  32. import org.junit.Test;
  33. import org.junit.rules.DisableOnDebug;
  34. import org.junit.rules.ExpectedException;
  35. import org.junit.rules.TemporaryFolder;
  36. import org.junit.rules.TestRule;
  37. import org.junit.rules.Timeout;
  38. import org.mockito.Mockito;
  39. import org.sonar.application.command.AbstractCommand;
  40. import org.sonar.application.command.CommandFactory;
  41. import org.sonar.application.command.EsScriptCommand;
  42. import org.sonar.application.command.JavaCommand;
  43. import org.sonar.application.config.TestAppSettings;
  44. import org.sonar.application.process.ProcessLauncher;
  45. import org.sonar.application.process.ProcessMonitor;
  46. import org.sonar.process.ProcessId;
  47. import org.sonar.process.cluster.hz.HazelcastMember;
  48. import static java.util.Collections.synchronizedList;
  49. import static org.apache.commons.lang.RandomStringUtils.randomAlphanumeric;
  50. import static org.assertj.core.api.Assertions.assertThat;
  51. import static org.mockito.Mockito.doThrow;
  52. import static org.mockito.Mockito.mock;
  53. import static org.sonar.process.ProcessId.COMPUTE_ENGINE;
  54. import static org.sonar.process.ProcessId.ELASTICSEARCH;
  55. import static org.sonar.process.ProcessId.WEB_SERVER;
  56. import static org.sonar.process.ProcessProperties.Property.CLUSTER_ENABLED;
  57. import static org.sonar.process.ProcessProperties.Property.CLUSTER_NODE_HOST;
  58. import static org.sonar.process.ProcessProperties.Property.CLUSTER_NODE_NAME;
  59. import static org.sonar.process.ProcessProperties.Property.CLUSTER_NODE_HZ_PORT;
  60. import static org.sonar.process.ProcessProperties.Property.CLUSTER_NODE_TYPE;
  61. public class SchedulerImplTest {
  62. @Rule
  63. public TestRule safeguardTimeout = new DisableOnDebug(Timeout.seconds(60));
  64. @Rule
  65. public ExpectedException expectedException = ExpectedException.none();
  66. @Rule
  67. public TemporaryFolder temporaryFolder = new TemporaryFolder();
  68. private EsScriptCommand esScriptCommand;
  69. private JavaCommand webLeaderCommand;
  70. private JavaCommand webFollowerCommand;
  71. private JavaCommand ceCommand;
  72. private AppReloader appReloader = mock(AppReloader.class);
  73. private TestAppSettings settings = new TestAppSettings();
  74. private TestCommandFactory javaCommandFactory = new TestCommandFactory();
  75. private TestProcessLauncher processLauncher = new TestProcessLauncher();
  76. private TestAppState appState = new TestAppState();
  77. private HazelcastMember hazelcastMember = mock(HazelcastMember.class);
  78. private TestClusterAppState clusterAppState = new TestClusterAppState(hazelcastMember);
  79. private List<ProcessId> orderedStops = synchronizedList(new ArrayList<>());
  80. @Before
  81. public void setUp() throws Exception {
  82. File tempDir = temporaryFolder.newFolder();
  83. esScriptCommand = new EsScriptCommand(ELASTICSEARCH, tempDir);
  84. webLeaderCommand = new JavaCommand(WEB_SERVER, tempDir);
  85. webFollowerCommand = new JavaCommand(WEB_SERVER, tempDir);
  86. ceCommand = new JavaCommand(COMPUTE_ENGINE, tempDir);
  87. }
  88. @After
  89. public void tearDown() {
  90. processLauncher.close();
  91. }
  92. @Test
  93. public void start_and_stop_sequence_of_ES_WEB_CE_in_order() throws Exception {
  94. SchedulerImpl underTest = newScheduler(false);
  95. underTest.schedule();
  96. // elasticsearch does not have preconditions to start
  97. TestProcess es = processLauncher.waitForProcess(ELASTICSEARCH);
  98. assertThat(es.isAlive()).isTrue();
  99. assertThat(processLauncher.processes).hasSize(1);
  100. // elasticsearch becomes operational -> web leader is starting
  101. es.operational = true;
  102. waitForAppStateOperational(appState, ELASTICSEARCH);
  103. TestProcess web = processLauncher.waitForProcess(WEB_SERVER);
  104. assertThat(web.isAlive()).isTrue();
  105. assertThat(processLauncher.processes).hasSize(2);
  106. assertThat(processLauncher.commands).containsExactly(esScriptCommand, webLeaderCommand);
  107. // web becomes operational -> CE is starting
  108. web.operational = true;
  109. waitForAppStateOperational(appState, WEB_SERVER);
  110. TestProcess ce = processLauncher.waitForProcess(COMPUTE_ENGINE);
  111. assertThat(ce.isAlive()).isTrue();
  112. assertThat(processLauncher.processes).hasSize(3);
  113. assertThat(processLauncher.commands).containsExactly(esScriptCommand, webLeaderCommand, ceCommand);
  114. // all processes are up
  115. processLauncher.processes.values().forEach(p -> assertThat(p.isAlive()).isTrue());
  116. // processes are stopped in reverse order of startup
  117. underTest.terminate();
  118. assertThat(orderedStops).containsExactly(COMPUTE_ENGINE, WEB_SERVER, ELASTICSEARCH);
  119. processLauncher.processes.values().forEach(p -> assertThat(p.isAlive()).isFalse());
  120. // does nothing because scheduler is already terminated
  121. underTest.awaitTermination();
  122. }
  123. @Test
  124. public void all_processes_are_stopped_if_one_process_goes_down() throws Exception {
  125. Scheduler underTest = startAll();
  126. processLauncher.waitForProcess(WEB_SERVER).destroyForcibly();
  127. underTest.awaitTermination();
  128. assertThat(orderedStops).containsExactly(WEB_SERVER, COMPUTE_ENGINE, ELASTICSEARCH);
  129. processLauncher.processes.values().forEach(p -> assertThat(p.isAlive()).isFalse());
  130. // following does nothing
  131. underTest.terminate();
  132. underTest.awaitTermination();
  133. }
  134. @Test
  135. public void all_processes_are_stopped_if_one_process_fails_to_start() throws Exception {
  136. SchedulerImpl underTest = newScheduler(false);
  137. processLauncher.makeStartupFail = COMPUTE_ENGINE;
  138. underTest.schedule();
  139. processLauncher.waitForProcess(ELASTICSEARCH).operational = true;
  140. processLauncher.waitForProcess(WEB_SERVER).operational = true;
  141. underTest.awaitTermination();
  142. assertThat(orderedStops).containsExactly(WEB_SERVER, ELASTICSEARCH);
  143. processLauncher.processes.values().forEach(p -> assertThat(p.isAlive()).isFalse());
  144. }
  145. @Test
  146. public void terminate_can_be_called_multiple_times() throws Exception {
  147. Scheduler underTest = startAll();
  148. underTest.terminate();
  149. processLauncher.processes.values().forEach(p -> assertThat(p.isAlive()).isFalse());
  150. // does nothing
  151. underTest.terminate();
  152. }
  153. @Test
  154. public void awaitTermination_blocks_until_all_processes_are_stopped() throws Exception {
  155. Scheduler underTest = startAll();
  156. Thread awaitingTermination = new Thread(() -> underTest.awaitTermination());
  157. awaitingTermination.start();
  158. assertThat(awaitingTermination.isAlive()).isTrue();
  159. underTest.terminate();
  160. // the thread is being stopped
  161. awaitingTermination.join();
  162. assertThat(awaitingTermination.isAlive()).isFalse();
  163. }
  164. @Test
  165. public void restart_stops_all_if_new_settings_are_not_allowed() throws Exception {
  166. Scheduler underTest = startAll();
  167. doThrow(new IllegalStateException("reload error")).when(appReloader).reload(settings);
  168. processLauncher.waitForProcess(WEB_SERVER).askedForRestart = true;
  169. // waiting for all processes to be stopped
  170. processLauncher.waitForProcessDown(ELASTICSEARCH);
  171. processLauncher.waitForProcessDown(COMPUTE_ENGINE);
  172. processLauncher.waitForProcessDown(WEB_SERVER);
  173. // verify that awaitTermination() does not block
  174. underTest.awaitTermination();
  175. }
  176. @Test
  177. public void search_node_starts_only_elasticsearch() throws Exception {
  178. settings.set(CLUSTER_ENABLED.getKey(), "true");
  179. settings.set(CLUSTER_NODE_TYPE.getKey(), "search");
  180. addRequiredNodeProperties();
  181. SchedulerImpl underTest = newScheduler(true);
  182. underTest.schedule();
  183. processLauncher.waitForProcessAlive(ProcessId.ELASTICSEARCH);
  184. assertThat(processLauncher.processes).hasSize(1);
  185. underTest.terminate();
  186. }
  187. @Test
  188. public void application_node_starts_only_web_and_ce() throws Exception {
  189. clusterAppState.setOperational(ProcessId.ELASTICSEARCH);
  190. settings.set(CLUSTER_ENABLED.getKey(), "true");
  191. settings.set(CLUSTER_NODE_TYPE.getKey(), "application");
  192. SchedulerImpl underTest = newScheduler(true);
  193. underTest.schedule();
  194. TestProcess web = processLauncher.waitForProcessAlive(WEB_SERVER);
  195. web.operational = true;
  196. processLauncher.waitForProcessAlive(COMPUTE_ENGINE);
  197. assertThat(processLauncher.processes).hasSize(2);
  198. underTest.terminate();
  199. }
  200. @Test
  201. public void search_node_starts_even_if_web_leader_is_not_yet_operational() throws Exception {
  202. // leader takes the lock, so underTest won't get it
  203. assertThat(clusterAppState.tryToLockWebLeader()).isTrue();
  204. clusterAppState.setOperational(ProcessId.ELASTICSEARCH);
  205. settings.set(CLUSTER_ENABLED.getKey(), "true");
  206. settings.set(CLUSTER_NODE_TYPE.getKey(), "search");
  207. addRequiredNodeProperties();
  208. SchedulerImpl underTest = newScheduler(true);
  209. underTest.schedule();
  210. processLauncher.waitForProcessAlive(ProcessId.ELASTICSEARCH);
  211. assertThat(processLauncher.processes).hasSize(1);
  212. underTest.terminate();
  213. }
  214. @Test
  215. public void web_follower_starts_only_when_web_leader_is_operational() throws Exception {
  216. // leader takes the lock, so underTest won't get it
  217. assertThat(clusterAppState.tryToLockWebLeader()).isTrue();
  218. clusterAppState.setOperational(ProcessId.ELASTICSEARCH);
  219. settings.set(CLUSTER_ENABLED.getKey(), "true");
  220. settings.set(CLUSTER_NODE_TYPE.getKey(), "application");
  221. SchedulerImpl underTest = newScheduler(true);
  222. underTest.schedule();
  223. assertThat(processLauncher.processes).hasSize(0);
  224. // leader becomes operational -> follower can start
  225. clusterAppState.setOperational(WEB_SERVER);
  226. processLauncher.waitForProcessAlive(WEB_SERVER);
  227. processLauncher.waitForProcessAlive(COMPUTE_ENGINE);
  228. assertThat(processLauncher.processes).hasSize(2);
  229. underTest.terminate();
  230. }
  231. @Test
  232. public void web_server_waits_for_remote_elasticsearch_to_be_started_if_local_es_is_disabled() throws Exception {
  233. settings.set(CLUSTER_ENABLED.getKey(), "true");
  234. settings.set(CLUSTER_NODE_TYPE.getKey(), "application");
  235. SchedulerImpl underTest = newScheduler(true);
  236. underTest.schedule();
  237. // WEB and CE wait for ES to be up
  238. assertThat(processLauncher.processes).isEmpty();
  239. // ES becomes operational on another node -> web leader can start
  240. clusterAppState.setRemoteOperational(ProcessId.ELASTICSEARCH);
  241. processLauncher.waitForProcessAlive(WEB_SERVER);
  242. assertThat(processLauncher.processes).hasSize(1);
  243. underTest.terminate();
  244. }
  245. private SchedulerImpl newScheduler(boolean clustered) {
  246. return new SchedulerImpl(settings, appReloader, javaCommandFactory, processLauncher, clustered ? clusterAppState : appState)
  247. .setProcessWatcherDelayMs(1L);
  248. }
  249. private Scheduler startAll() throws InterruptedException {
  250. SchedulerImpl scheduler = newScheduler(false);
  251. scheduler.schedule();
  252. processLauncher.waitForProcess(ELASTICSEARCH).operational = true;
  253. processLauncher.waitForProcess(WEB_SERVER).operational = true;
  254. processLauncher.waitForProcess(COMPUTE_ENGINE).operational = true;
  255. return scheduler;
  256. }
  257. private static void waitForAppStateOperational(AppState appState, ProcessId id) throws InterruptedException {
  258. while (true) {
  259. if (appState.isOperational(id, true)) {
  260. return;
  261. }
  262. Thread.sleep(1L);
  263. }
  264. }
  265. private void addRequiredNodeProperties() {
  266. settings.set(CLUSTER_NODE_NAME.getKey(), randomAlphanumeric(4));
  267. settings.set(CLUSTER_NODE_HOST.getKey(), randomAlphanumeric(4));
  268. settings.set(CLUSTER_NODE_HZ_PORT.getKey(), String.valueOf(1 + new Random().nextInt(999)));
  269. }
  270. private class TestCommandFactory implements CommandFactory {
  271. @Override
  272. public EsScriptCommand createEsCommand() {
  273. return esScriptCommand;
  274. }
  275. @Override
  276. public JavaCommand createWebCommand(boolean leader) {
  277. return leader ? webLeaderCommand : webFollowerCommand;
  278. }
  279. @Override
  280. public JavaCommand createCeCommand() {
  281. return ceCommand;
  282. }
  283. }
  284. private class TestProcessLauncher implements ProcessLauncher {
  285. private final EnumMap<ProcessId, TestProcess> processes = new EnumMap<>(ProcessId.class);
  286. private final List<AbstractCommand<?>> commands = synchronizedList(new ArrayList<>());
  287. private ProcessId makeStartupFail = null;
  288. @Override
  289. public ProcessMonitor launch(AbstractCommand command) {
  290. return launchImpl(command);
  291. }
  292. private ProcessMonitor launchImpl(AbstractCommand<?> javaCommand) {
  293. commands.add(javaCommand);
  294. if (makeStartupFail == javaCommand.getProcessId()) {
  295. throw new IllegalStateException("cannot start " + javaCommand.getProcessId());
  296. }
  297. TestProcess process = new TestProcess(javaCommand.getProcessId());
  298. processes.put(javaCommand.getProcessId(), process);
  299. return process;
  300. }
  301. private TestProcess waitForProcess(ProcessId id) throws InterruptedException {
  302. while (true) {
  303. TestProcess p = processes.get(id);
  304. if (p != null) {
  305. return p;
  306. }
  307. Thread.sleep(1L);
  308. }
  309. }
  310. private TestProcess waitForProcessAlive(ProcessId id) throws InterruptedException {
  311. while (true) {
  312. TestProcess p = processes.get(id);
  313. if (p != null && p.isAlive()) {
  314. return p;
  315. }
  316. Thread.sleep(1L);
  317. }
  318. }
  319. private TestProcess waitForProcessDown(ProcessId id) throws InterruptedException {
  320. while (true) {
  321. TestProcess p = processes.get(id);
  322. if (p != null && !p.isAlive()) {
  323. return p;
  324. }
  325. Thread.sleep(1L);
  326. }
  327. }
  328. @Override
  329. public void close() {
  330. for (TestProcess process : processes.values()) {
  331. process.destroyForcibly();
  332. }
  333. }
  334. }
  335. private class TestProcess implements ProcessMonitor, AutoCloseable {
  336. private final ProcessId processId;
  337. private final CountDownLatch alive = new CountDownLatch(1);
  338. private boolean operational = false;
  339. private boolean askedForRestart = false;
  340. private TestProcess(ProcessId processId) {
  341. this.processId = processId;
  342. }
  343. @Override
  344. public InputStream getInputStream() {
  345. return mock(InputStream.class, Mockito.RETURNS_MOCKS);
  346. }
  347. @Override
  348. public InputStream getErrorStream() {
  349. return mock(InputStream.class, Mockito.RETURNS_MOCKS);
  350. }
  351. @Override
  352. public void closeStreams() {
  353. }
  354. @Override
  355. public boolean isAlive() {
  356. return alive.getCount() == 1;
  357. }
  358. @Override
  359. public void askForStop() {
  360. destroyForcibly();
  361. }
  362. @Override
  363. public void destroyForcibly() {
  364. if (isAlive()) {
  365. orderedStops.add(processId);
  366. }
  367. alive.countDown();
  368. }
  369. @Override
  370. public void waitFor() throws InterruptedException {
  371. alive.await();
  372. }
  373. @Override
  374. public void waitFor(long timeout, TimeUnit timeoutUnit) throws InterruptedException {
  375. alive.await(timeout, timeoutUnit);
  376. }
  377. @Override
  378. public boolean isOperational() {
  379. return operational;
  380. }
  381. @Override
  382. public boolean askedForRestart() {
  383. return askedForRestart;
  384. }
  385. @Override
  386. public void acknowledgeAskForRestart() {
  387. this.askedForRestart = false;
  388. }
  389. @Override
  390. public void close() {
  391. alive.countDown();
  392. }
  393. }
  394. }