You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

CeQueueImplIT.java 30KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666
  1. /*
  2. * SonarQube
  3. * Copyright (C) 2009-2024 SonarSource SA
  4. * mailto:info AT sonarsource DOT com
  5. *
  6. * This program is free software; you can redistribute it and/or
  7. * modify it under the terms of the GNU Lesser General Public
  8. * License as published by the Free Software Foundation; either
  9. * version 3 of the License, or (at your option) any later version.
  10. *
  11. * This program is distributed in the hope that it will be useful,
  12. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  14. * Lesser General Public License for more details.
  15. *
  16. * You should have received a copy of the GNU Lesser General Public License
  17. * along with this program; if not, write to the Free Software Foundation,
  18. * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
  19. */
  20. package org.sonar.ce.queue;
  21. import java.util.List;
  22. import java.util.Optional;
  23. import java.util.Random;
  24. import java.util.stream.IntStream;
  25. import javax.annotation.Nullable;
  26. import org.junit.Rule;
  27. import org.junit.Test;
  28. import org.sonar.api.impl.utils.TestSystem2;
  29. import org.sonar.api.utils.System2;
  30. import org.sonar.ce.queue.CeTaskSubmit.Component;
  31. import org.sonar.ce.task.CeTask;
  32. import org.sonar.core.util.SequenceUuidFactory;
  33. import org.sonar.core.util.UuidFactory;
  34. import org.sonar.core.util.UuidFactoryFast;
  35. import org.sonar.db.DbSession;
  36. import org.sonar.db.DbTester;
  37. import org.sonar.db.ce.CeActivityDto;
  38. import org.sonar.db.ce.CeQueueDto;
  39. import org.sonar.db.ce.CeTaskTypes;
  40. import org.sonar.db.component.BranchDto;
  41. import org.sonar.db.component.ComponentDto;
  42. import org.sonar.db.component.ProjectData;
  43. import org.sonar.db.entity.EntityDto;
  44. import org.sonar.db.project.ProjectDto;
  45. import org.sonar.db.user.UserDto;
  46. import org.sonar.db.user.UserTesting;
  47. import org.sonar.server.platform.NodeInformation;
  48. import static com.google.common.collect.ImmutableList.of;
  49. import static java.util.Arrays.asList;
  50. import static java.util.Collections.emptyMap;
  51. import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
  52. import static org.assertj.core.api.Assertions.assertThat;
  53. import static org.assertj.core.api.Assertions.assertThatThrownBy;
  54. import static org.assertj.core.api.Assertions.catchThrowable;
  55. import static org.assertj.core.api.Assertions.tuple;
  56. import static org.mockito.Mockito.mock;
  57. import static org.mockito.Mockito.when;
  58. import static org.sonar.ce.queue.CeQueue.SubmitOption.UNIQUE_QUEUE_PER_ENTITY;
  59. import static org.sonar.ce.queue.CeQueue.SubmitOption.UNIQUE_QUEUE_PER_TASK_TYPE;
  60. public class CeQueueImplIT {
  61. private static final String WORKER_UUID = "workerUuid";
  62. private static final long NOW = 1_450_000_000_000L;
  63. private static final String NODE_NAME = "nodeName1";
  64. private System2 system2 = new TestSystem2().setNow(NOW);
  65. @Rule
  66. public DbTester db = DbTester.create(system2);
  67. private DbSession session = db.getSession();
  68. private UuidFactory uuidFactory = new SequenceUuidFactory();
  69. private NodeInformation nodeInformation = mock(NodeInformation.class);
  70. private CeQueue underTest = new CeQueueImpl(system2, db.getDbClient(), uuidFactory, nodeInformation);
  71. @Test
  72. public void submit_returns_task_populated_from_CeTaskSubmit_and_creates_CeQueue_row() {
  73. String componentUuid = randomAlphabetic(3);
  74. String mainComponentUuid = randomAlphabetic(4);
  75. CeTaskSubmit taskSubmit = createTaskSubmit(CeTaskTypes.REPORT, new Component(componentUuid, mainComponentUuid), "submitter uuid");
  76. UserDto userDto = db.getDbClient().userDao().selectByUuid(db.getSession(), taskSubmit.getSubmitterUuid());
  77. CeTask task = underTest.submit(taskSubmit);
  78. verifyCeTask(taskSubmit, task, null, null, userDto);
  79. verifyCeQueueDtoForTaskSubmit(taskSubmit);
  80. }
  81. @Test
  82. public void submit_populates_component_name_and_key_of_CeTask_if_component_exists() {
  83. ProjectData projectData = db.components().insertPrivateProject("PROJECT_1");
  84. CeTaskSubmit taskSubmit = createTaskSubmit(CeTaskTypes.REPORT, Component.fromDto(projectData.getMainBranchDto()), null);
  85. CeTask task = underTest.submit(taskSubmit);
  86. verifyCeTask(taskSubmit, task, projectData.getProjectDto(), projectData.getMainBranchDto(), null);
  87. }
  88. @Test
  89. public void submit_returns_task_without_component_info_when_submit_has_none() {
  90. CeTaskSubmit taskSubmit = createTaskSubmit("not cpt related");
  91. CeTask task = underTest.submit(taskSubmit);
  92. verifyCeTask(taskSubmit, task, null, null, null);
  93. }
  94. @Test
  95. public void submit_populates_submitter_login_of_CeTask_if_submitter_exists() {
  96. UserDto userDto = insertUser(UserTesting.newUserDto());
  97. CeTaskSubmit taskSubmit = createTaskSubmit(CeTaskTypes.REPORT, null, userDto.getUuid());
  98. CeTask task = underTest.submit(taskSubmit);
  99. verifyCeTask(taskSubmit, task, null, null, userDto);
  100. }
  101. @Test
  102. public void submit_with_UNIQUE_QUEUE_PER_MAIN_COMPONENT_creates_task_without_component_when_there_is_a_pending_task_without_component() {
  103. CeTaskSubmit taskSubmit = createTaskSubmit("no_component");
  104. CeQueueDto dto = insertPendingInQueue(null);
  105. Optional<CeTask> task = underTest.submit(taskSubmit, UNIQUE_QUEUE_PER_ENTITY);
  106. assertThat(task).isNotEmpty();
  107. assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession()))
  108. .extracting(CeQueueDto::getUuid)
  109. .containsOnly(dto.getUuid(), task.get().getUuid());
  110. }
  111. @Test
  112. public void submit_with_UNIQUE_QUEUE_PER_MAIN_COMPONENT_creates_task_when_there_is_a_pending_task_for_another_main_component() {
  113. String mainComponentUuid = randomAlphabetic(5);
  114. String otherMainComponentUuid = randomAlphabetic(6);
  115. CeTaskSubmit taskSubmit = createTaskSubmit("with_component", newComponent(mainComponentUuid), null);
  116. CeQueueDto dto = insertPendingInQueue(newComponent(otherMainComponentUuid));
  117. Optional<CeTask> task = underTest.submit(taskSubmit, UNIQUE_QUEUE_PER_ENTITY);
  118. assertThat(task).isNotEmpty();
  119. assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession()))
  120. .extracting(CeQueueDto::getUuid)
  121. .containsOnly(dto.getUuid(), task.get().getUuid());
  122. }
  123. @Test
  124. public void submit_with_UNIQUE_QUEUE_PER_MAIN_COMPONENT_does_not_create_task_when_there_is_one_pending_task_for_same_main_component() {
  125. String mainComponentUuid = randomAlphabetic(5);
  126. CeTaskSubmit taskSubmit = createTaskSubmit("with_component", newComponent(mainComponentUuid), null);
  127. CeQueueDto dto = insertPendingInQueue(newComponent(mainComponentUuid));
  128. Optional<CeTask> task = underTest.submit(taskSubmit, UNIQUE_QUEUE_PER_ENTITY);
  129. assertThat(task).isEmpty();
  130. assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession()))
  131. .extracting(CeQueueDto::getUuid)
  132. .containsOnly(dto.getUuid());
  133. }
  134. @Test
  135. public void submit_with_UNIQUE_QUEUE_PER_MAIN_COMPONENT_does_not_create_task_when_there_is_many_pending_task_for_same_main_component() {
  136. String mainComponentUuid = randomAlphabetic(5);
  137. CeTaskSubmit taskSubmit = createTaskSubmit("with_component", newComponent(mainComponentUuid), null);
  138. String[] uuids = IntStream.range(0, 2 + new Random().nextInt(5))
  139. .mapToObj(i -> insertPendingInQueue(newComponent(mainComponentUuid)))
  140. .map(CeQueueDto::getUuid)
  141. .toArray(String[]::new);
  142. Optional<CeTask> task = underTest.submit(taskSubmit, UNIQUE_QUEUE_PER_ENTITY);
  143. assertThat(task).isEmpty();
  144. assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession()))
  145. .extracting(CeQueueDto::getUuid)
  146. .containsOnly(uuids);
  147. }
  148. @Test
  149. public void submit_without_UNIQUE_QUEUE_PER_MAIN_COMPONENT_creates_task_when_there_is_one_pending_task_for_same_main_component() {
  150. String mainComponentUuid = randomAlphabetic(5);
  151. CeTaskSubmit taskSubmit = createTaskSubmit("with_component", newComponent(mainComponentUuid), null);
  152. CeQueueDto dto = insertPendingInQueue(newComponent(mainComponentUuid));
  153. CeTask task = underTest.submit(taskSubmit);
  154. assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession()))
  155. .extracting(CeQueueDto::getUuid)
  156. .containsOnly(dto.getUuid(), task.getUuid());
  157. }
  158. @Test
  159. public void submit_without_UNIQUE_QUEUE_PER_MAIN_COMPONENT_creates_task_when_there_is_many_pending_task_for_same_main_component() {
  160. String mainComponentUuid = randomAlphabetic(5);
  161. CeTaskSubmit taskSubmit = createTaskSubmit("with_component", newComponent(mainComponentUuid), null);
  162. String[] uuids = IntStream.range(0, 2 + new Random().nextInt(5))
  163. .mapToObj(i -> insertPendingInQueue(newComponent(mainComponentUuid)))
  164. .map(CeQueueDto::getUuid)
  165. .toArray(String[]::new);
  166. CeTask task = underTest.submit(taskSubmit);
  167. assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession()))
  168. .extracting(CeQueueDto::getUuid)
  169. .hasSize(uuids.length + 1)
  170. .contains(uuids)
  171. .contains(task.getUuid());
  172. }
  173. @Test
  174. public void submit_with_UNIQUE_QUEUE_PER_TASK_TYPE_does_not_create_task_when_there_is_a_task_with_the_same_type() {
  175. String mainComponentUuid = randomAlphabetic(5);
  176. CeTaskSubmit taskSubmit = createTaskSubmit("some type", newComponent(mainComponentUuid), null);
  177. String[] uuids = IntStream.range(0, 2 + new Random().nextInt(5))
  178. .mapToObj(i -> insertPendingInQueue(newComponent(mainComponentUuid)))
  179. .map(CeQueueDto::getUuid)
  180. .toArray(String[]::new);
  181. Optional<CeTask> task = underTest.submit(taskSubmit, UNIQUE_QUEUE_PER_TASK_TYPE);
  182. assertThat(task).isEmpty();
  183. assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession()))
  184. .extracting(CeQueueDto::getUuid)
  185. .containsOnly(uuids);
  186. }
  187. @Test
  188. public void massSubmit_returns_tasks_for_each_CeTaskSubmit_populated_from_CeTaskSubmit_and_creates_CeQueue_row_for_each() {
  189. String mainComponentUuid = randomAlphabetic(10);
  190. CeTaskSubmit taskSubmit1 = createTaskSubmit(CeTaskTypes.REPORT, newComponent(mainComponentUuid), "submitter uuid");
  191. CeTaskSubmit taskSubmit2 = createTaskSubmit("some type");
  192. UserDto userDto1 = db.getDbClient().userDao().selectByUuid(db.getSession(), taskSubmit1.getSubmitterUuid());
  193. List<CeTask> tasks = underTest.massSubmit(asList(taskSubmit1, taskSubmit2));
  194. assertThat(tasks).hasSize(2);
  195. verifyCeTask(taskSubmit1, tasks.get(0), null, null, userDto1);
  196. verifyCeTask(taskSubmit2, tasks.get(1), null, null, null);
  197. verifyCeQueueDtoForTaskSubmit(taskSubmit1);
  198. verifyCeQueueDtoForTaskSubmit(taskSubmit2);
  199. }
  200. @Test
  201. public void massSubmit_populates_component_name_and_key_of_CeTask_if_project_exists() {
  202. ProjectData projectData = db.components().insertPrivateProject("PROJECT_1");
  203. CeTaskSubmit taskSubmit1 = createTaskSubmit(CeTaskTypes.REPORT, Component.fromDto(projectData.getMainBranchDto()), null);
  204. CeTaskSubmit taskSubmit2 = createTaskSubmit("something", newComponent(randomAlphabetic(12)), null);
  205. List<CeTask> tasks = underTest.massSubmit(asList(taskSubmit1, taskSubmit2));
  206. assertThat(tasks).hasSize(2);
  207. verifyCeTask(taskSubmit1, tasks.get(0), projectData.getProjectDto(), projectData.getMainBranchDto(), null);
  208. verifyCeTask(taskSubmit2, tasks.get(1), null, null, null);
  209. }
  210. @Test
  211. public void massSubmit_populates_component_name_and_key_of_CeTask_if_project_and_branch_exists() {
  212. ProjectDto project = db.components().insertPublicProject(p -> p.setUuid("PROJECT_1").setBranchUuid("PROJECT_1")).getProjectDto();
  213. BranchDto branch1 = db.components().insertProjectBranch(project);
  214. BranchDto branch2 = db.components().insertProjectBranch(project);
  215. CeTaskSubmit taskSubmit1 = createTaskSubmit(CeTaskTypes.REPORT, Component.fromDto(branch1.getUuid(), project.getUuid()), null);
  216. CeTaskSubmit taskSubmit2 = createTaskSubmit("something", Component.fromDto(branch2.getUuid(), project.getUuid()), null);
  217. List<CeTask> tasks = underTest.massSubmit(asList(taskSubmit1, taskSubmit2));
  218. assertThat(tasks).hasSize(2);
  219. verifyCeTask(taskSubmit1, tasks.get(0), project, branch1, null);
  220. verifyCeTask(taskSubmit2, tasks.get(1), project, branch2, null);
  221. }
  222. @Test
  223. public void massSubmit_with_UNIQUE_QUEUE_PER_MAIN_COMPONENT_creates_task_without_component_when_there_is_a_pending_task_without_component() {
  224. CeTaskSubmit taskSubmit = createTaskSubmit("no_component");
  225. CeQueueDto dto = insertPendingInQueue(null);
  226. List<CeTask> tasks = underTest.massSubmit(of(taskSubmit), UNIQUE_QUEUE_PER_ENTITY);
  227. assertThat(tasks).hasSize(1);
  228. assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession()))
  229. .extracting(CeQueueDto::getUuid)
  230. .containsOnly(dto.getUuid(), tasks.iterator().next().getUuid());
  231. }
  232. @Test
  233. public void massSubmit_with_UNIQUE_QUEUE_PER_MAIN_COMPONENT_creates_task_when_there_is_a_pending_task_for_another_main_component() {
  234. String mainComponentUuid = randomAlphabetic(5);
  235. String otherMainComponentUuid = randomAlphabetic(6);
  236. CeTaskSubmit taskSubmit = createTaskSubmit("with_component", newComponent(mainComponentUuid), null);
  237. CeQueueDto dto = insertPendingInQueue(newComponent(otherMainComponentUuid));
  238. List<CeTask> tasks = underTest.massSubmit(of(taskSubmit), UNIQUE_QUEUE_PER_ENTITY);
  239. assertThat(tasks).hasSize(1);
  240. assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession()))
  241. .extracting(CeQueueDto::getUuid)
  242. .containsOnly(dto.getUuid(), tasks.iterator().next().getUuid());
  243. }
  244. @Test
  245. public void massSubmit_with_UNIQUE_QUEUE_PER_MAIN_COMPONENT_does_not_create_task_when_there_is_one_pending_task_for_same_main_component() {
  246. String mainComponentUuid = randomAlphabetic(5);
  247. CeTaskSubmit taskSubmit = createTaskSubmit("with_component", newComponent(mainComponentUuid), null);
  248. CeQueueDto dto = insertPendingInQueue(newComponent(mainComponentUuid));
  249. List<CeTask> tasks = underTest.massSubmit(of(taskSubmit), UNIQUE_QUEUE_PER_ENTITY);
  250. assertThat(tasks).isEmpty();
  251. assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession()))
  252. .extracting(CeQueueDto::getUuid)
  253. .containsOnly(dto.getUuid());
  254. }
  255. @Test
  256. public void massSubmit_with_UNIQUE_QUEUE_PER_MAIN_COMPONENT_does_not_create_task_when_there_is_many_pending_task_for_same_main_component() {
  257. String mainComponentUuid = randomAlphabetic(5);
  258. CeTaskSubmit taskSubmit = createTaskSubmit("with_component", newComponent(mainComponentUuid), null);
  259. String[] uuids = IntStream.range(0, 7)
  260. .mapToObj(i -> insertPendingInQueue(newComponent(mainComponentUuid)))
  261. .map(CeQueueDto::getUuid)
  262. .toArray(String[]::new);
  263. List<CeTask> tasks = underTest.massSubmit(of(taskSubmit), UNIQUE_QUEUE_PER_ENTITY);
  264. assertThat(tasks).isEmpty();
  265. assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession()))
  266. .extracting(CeQueueDto::getUuid)
  267. .containsOnly(uuids);
  268. }
  269. @Test
  270. public void massSubmit_without_UNIQUE_QUEUE_PER_MAIN_COMPONENT_creates_task_when_there_is_one_pending_task_for_other_main_component() {
  271. String mainComponentUuid = randomAlphabetic(5);
  272. CeTaskSubmit taskSubmit = createTaskSubmit("with_component", newComponent(mainComponentUuid), null);
  273. CeQueueDto dto = insertPendingInQueue(newComponent(mainComponentUuid));
  274. List<CeTask> tasks = underTest.massSubmit(of(taskSubmit));
  275. assertThat(tasks).hasSize(1);
  276. assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession()))
  277. .extracting(CeQueueDto::getUuid)
  278. .containsOnly(dto.getUuid(), tasks.iterator().next().getUuid());
  279. }
  280. @Test
  281. public void massSubmit_without_UNIQUE_QUEUE_PER_MAIN_COMPONENT_creates_task_when_there_is_many_pending_task_for_other_main_component() {
  282. String mainComponentUuid = randomAlphabetic(5);
  283. CeTaskSubmit taskSubmit = createTaskSubmit("with_component", newComponent(mainComponentUuid), null);
  284. String[] uuids = IntStream.range(0, 2 + new Random().nextInt(5))
  285. .mapToObj(i -> insertPendingInQueue(newComponent(mainComponentUuid)))
  286. .map(CeQueueDto::getUuid)
  287. .toArray(String[]::new);
  288. List<CeTask> tasks = underTest.massSubmit(of(taskSubmit));
  289. assertThat(tasks).hasSize(1);
  290. assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession()))
  291. .extracting(CeQueueDto::getUuid)
  292. .hasSize(uuids.length + 1)
  293. .contains(uuids)
  294. .contains(tasks.iterator().next().getUuid());
  295. }
  296. @Test
  297. public void massSubmit_with_UNIQUE_QUEUE_PER_MAIN_COMPONENT_creates_tasks_depending_on_whether_there_is_pending_task_for_same_main_component() {
  298. String mainComponentUuid1 = randomAlphabetic(5);
  299. String mainComponentUuid2 = randomAlphabetic(6);
  300. String mainComponentUuid3 = randomAlphabetic(7);
  301. String mainComponentUuid4 = randomAlphabetic(8);
  302. String mainComponentUuid5 = randomAlphabetic(9);
  303. CeTaskSubmit taskSubmit1 = createTaskSubmit("with_one_pending", newComponent(mainComponentUuid1), null);
  304. CeQueueDto dto1 = insertPendingInQueue(newComponent(mainComponentUuid1));
  305. Component componentForMainComponentUuid2 = newComponent(mainComponentUuid2);
  306. CeTaskSubmit taskSubmit2 = createTaskSubmit("no_pending", componentForMainComponentUuid2, null);
  307. CeTaskSubmit taskSubmit3 = createTaskSubmit("with_many_pending", newComponent(mainComponentUuid3), null);
  308. String[] uuids3 = IntStream.range(0, 2 + new Random().nextInt(5))
  309. .mapToObj(i -> insertPendingInQueue(newComponent(mainComponentUuid3)))
  310. .map(CeQueueDto::getUuid)
  311. .toArray(String[]::new);
  312. Component componentForMainComponentUuid4 = newComponent(mainComponentUuid4);
  313. CeTaskSubmit taskSubmit4 = createTaskSubmit("no_pending_2", componentForMainComponentUuid4, null);
  314. CeTaskSubmit taskSubmit5 = createTaskSubmit("with_pending_2", newComponent(mainComponentUuid5), null);
  315. CeQueueDto dto5 = insertPendingInQueue(newComponent(mainComponentUuid5));
  316. List<CeTask> tasks = underTest.massSubmit(of(taskSubmit1, taskSubmit2, taskSubmit3, taskSubmit4, taskSubmit5), UNIQUE_QUEUE_PER_ENTITY);
  317. assertThat(tasks)
  318. .hasSize(2)
  319. .extracting(task -> task.getComponent().get().getUuid(), task -> task.getEntity().get().getUuid())
  320. .containsOnly(tuple(componentForMainComponentUuid2.getUuid(), componentForMainComponentUuid2.getEntityUuid()),
  321. tuple(componentForMainComponentUuid4.getUuid(), componentForMainComponentUuid4.getEntityUuid()));
  322. assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession()))
  323. .extracting(CeQueueDto::getUuid)
  324. .hasSize(1 + uuids3.length + 1 + tasks.size())
  325. .contains(dto1.getUuid())
  326. .contains(uuids3)
  327. .contains(dto5.getUuid())
  328. .containsAll(tasks.stream().map(CeTask::getUuid).toList());
  329. }
  330. @Test
  331. public void cancel_pending() {
  332. CeTask task = submit(CeTaskTypes.REPORT, newComponent(randomAlphabetic(12)));
  333. CeQueueDto queueDto = db.getDbClient().ceQueueDao().selectByUuid(db.getSession(), task.getUuid()).get();
  334. underTest.cancel(db.getSession(), queueDto);
  335. Optional<CeActivityDto> activity = findCeActivityDtoInDb(task);
  336. assertThat(activity).isPresent();
  337. assertThat(activity.get().getStatus()).isEqualTo(CeActivityDto.Status.CANCELED);
  338. }
  339. @Test
  340. public void cancel_pending_whenNodeNameProvided_setItInCeActivity() {
  341. when(nodeInformation.getNodeName()).thenReturn(Optional.of(NODE_NAME));
  342. CeTask task = submit(CeTaskTypes.REPORT, newComponent(randomAlphabetic(12)));
  343. CeQueueDto queueDto = db.getDbClient().ceQueueDao().selectByUuid(db.getSession(), task.getUuid()).get();
  344. underTest.cancel(db.getSession(), queueDto);
  345. Optional<CeActivityDto> activity = findCeActivityDtoInDb(task);
  346. assertThat(activity).isPresent();
  347. assertThat(activity.get().getNodeName()).isEqualTo(NODE_NAME);
  348. }
  349. @Test
  350. public void cancel_pending_whenNodeNameNOtProvided_setNulInCeActivity() {
  351. when(nodeInformation.getNodeName()).thenReturn(Optional.empty());
  352. CeTask task = submit(CeTaskTypes.REPORT, newComponent(randomAlphabetic(12)));
  353. CeQueueDto queueDto = db.getDbClient().ceQueueDao().selectByUuid(db.getSession(), task.getUuid()).get();
  354. underTest.cancel(db.getSession(), queueDto);
  355. Optional<CeActivityDto> activity = findCeActivityDtoInDb(task);
  356. assertThat(activity).isPresent();
  357. assertThat(activity.get().getNodeName()).isNull();
  358. }
  359. @Test
  360. public void fail_to_cancel_if_in_progress() {
  361. CeTask task = submit(CeTaskTypes.REPORT, newComponent(randomAlphabetic(11)));
  362. CeQueueDto ceQueueDto = db.getDbClient().ceQueueDao().tryToPeek(session, task.getUuid(), WORKER_UUID).get();
  363. assertThatThrownBy(() -> underTest.cancel(db.getSession(), ceQueueDto))
  364. .isInstanceOf(IllegalStateException.class)
  365. .hasMessageStartingWith("Task is in progress and can't be canceled");
  366. }
  367. @Test
  368. public void cancelAll_pendings_but_not_in_progress() {
  369. CeTask inProgressTask = submit(CeTaskTypes.REPORT, newComponent(randomAlphabetic(12)));
  370. CeTask pendingTask1 = submit(CeTaskTypes.REPORT, newComponent(randomAlphabetic(12)));
  371. CeTask pendingTask2 = submit(CeTaskTypes.REPORT, newComponent(randomAlphabetic(12)));
  372. db.getDbClient().ceQueueDao().tryToPeek(session, inProgressTask.getUuid(), WORKER_UUID);
  373. int canceledCount = underTest.cancelAll();
  374. assertThat(canceledCount).isEqualTo(2);
  375. Optional<CeActivityDto> ceActivityInProgress = findCeActivityDtoInDb(pendingTask1);
  376. assertThat(ceActivityInProgress.get().getStatus()).isEqualTo(CeActivityDto.Status.CANCELED);
  377. Optional<CeActivityDto> ceActivityPending1 = findCeActivityDtoInDb(pendingTask2);
  378. assertThat(ceActivityPending1.get().getStatus()).isEqualTo(CeActivityDto.Status.CANCELED);
  379. Optional<CeActivityDto> ceActivityPending2 = findCeActivityDtoInDb(inProgressTask);
  380. assertThat(ceActivityPending2).isNotPresent();
  381. }
  382. @Test
  383. public void pauseWorkers_marks_workers_as_paused_if_zero_tasks_in_progress() {
  384. submit(CeTaskTypes.REPORT, newComponent(randomAlphabetic(12)));
  385. // task is pending
  386. assertThat(underTest.getWorkersPauseStatus()).isEqualTo(CeQueue.WorkersPauseStatus.RESUMED);
  387. underTest.pauseWorkers();
  388. assertThat(underTest.getWorkersPauseStatus()).isEqualTo(CeQueue.WorkersPauseStatus.PAUSED);
  389. }
  390. @Test
  391. public void pauseWorkers_marks_workers_as_pausing_if_some_tasks_in_progress() {
  392. CeTask task = submit(CeTaskTypes.REPORT, newComponent(randomAlphabetic(12)));
  393. db.getDbClient().ceQueueDao().tryToPeek(session, task.getUuid(), WORKER_UUID);
  394. // task is in-progress
  395. assertThat(underTest.getWorkersPauseStatus()).isEqualTo(CeQueue.WorkersPauseStatus.RESUMED);
  396. underTest.pauseWorkers();
  397. assertThat(underTest.getWorkersPauseStatus()).isEqualTo(CeQueue.WorkersPauseStatus.PAUSING);
  398. }
  399. @Test
  400. public void resumeWorkers_does_nothing_if_not_paused() {
  401. assertThat(underTest.getWorkersPauseStatus()).isEqualTo(CeQueue.WorkersPauseStatus.RESUMED);
  402. underTest.resumeWorkers();
  403. assertThat(underTest.getWorkersPauseStatus()).isEqualTo(CeQueue.WorkersPauseStatus.RESUMED);
  404. }
  405. @Test
  406. public void resumeWorkers_resumes_pausing_workers() {
  407. CeTask task = submit(CeTaskTypes.REPORT, newComponent(randomAlphabetic(12)));
  408. db.getDbClient().ceQueueDao().tryToPeek(session, task.getUuid(), WORKER_UUID);
  409. // task is in-progress
  410. underTest.pauseWorkers();
  411. assertThat(underTest.getWorkersPauseStatus()).isEqualTo(CeQueue.WorkersPauseStatus.PAUSING);
  412. underTest.resumeWorkers();
  413. assertThat(underTest.getWorkersPauseStatus()).isEqualTo(CeQueue.WorkersPauseStatus.RESUMED);
  414. }
  415. @Test
  416. public void resumeWorkers_resumes_paused_workers() {
  417. underTest.pauseWorkers();
  418. assertThat(underTest.getWorkersPauseStatus()).isEqualTo(CeQueue.WorkersPauseStatus.PAUSED);
  419. underTest.resumeWorkers();
  420. assertThat(underTest.getWorkersPauseStatus()).isEqualTo(CeQueue.WorkersPauseStatus.RESUMED);
  421. }
  422. @Test
  423. public void fail_in_progress_task() {
  424. CeTask task = submit(CeTaskTypes.REPORT, newComponent(randomAlphabetic(12)));
  425. CeQueueDto queueDto = db.getDbClient().ceQueueDao().tryToPeek(db.getSession(), task.getUuid(), WORKER_UUID).get();
  426. underTest.fail(db.getSession(), queueDto, "TIMEOUT", "Failed on timeout");
  427. Optional<CeActivityDto> activity = findCeActivityDtoInDb(task);
  428. assertThat(activity).isPresent();
  429. assertThat(activity.get().getStatus()).isEqualTo(CeActivityDto.Status.FAILED);
  430. assertThat(activity.get().getErrorType()).isEqualTo("TIMEOUT");
  431. assertThat(activity.get().getErrorMessage()).isEqualTo("Failed on timeout");
  432. assertThat(activity.get().getExecutedAt()).isEqualTo(NOW);
  433. assertThat(activity.get().getWorkerUuid()).isEqualTo(WORKER_UUID);
  434. assertThat(activity.get().getNodeName()).isNull();
  435. }
  436. @Test
  437. public void fail_in_progress_task_whenNodeNameProvided_setsItInCeActivityDto() {
  438. when(nodeInformation.getNodeName()).thenReturn(Optional.of(NODE_NAME));
  439. CeTask task = submit(CeTaskTypes.REPORT, newComponent(randomAlphabetic(12)));
  440. CeQueueDto queueDto = db.getDbClient().ceQueueDao().tryToPeek(db.getSession(), task.getUuid(), WORKER_UUID).get();
  441. underTest.fail(db.getSession(), queueDto, "TIMEOUT", "Failed on timeout");
  442. Optional<CeActivityDto> activity = findCeActivityDtoInDb(task);
  443. assertThat(activity).isPresent();
  444. assertThat(activity.get().getNodeName()).isEqualTo(NODE_NAME);
  445. }
  446. private Optional<CeActivityDto> findCeActivityDtoInDb(CeTask task) {
  447. return db.getDbClient().ceActivityDao().selectByUuid(db.getSession(), task.getUuid());
  448. }
  449. @Test
  450. public void fail_throws_exception_if_task_is_pending() {
  451. CeTask task = submit(CeTaskTypes.REPORT, newComponent(randomAlphabetic(12)));
  452. CeQueueDto queueDto = db.getDbClient().ceQueueDao().selectByUuid(db.getSession(), task.getUuid()).get();
  453. Throwable thrown = catchThrowable(() -> underTest.fail(db.getSession(), queueDto, "TIMEOUT", "Failed on timeout"));
  454. assertThat(thrown)
  455. .isInstanceOf(IllegalStateException.class)
  456. .hasMessage("Task is not in-progress and can't be marked as failed [uuid=" + task.getUuid() + "]");
  457. }
  458. private void verifyCeTask(CeTaskSubmit taskSubmit, CeTask task, @Nullable EntityDto entityDto, @Nullable BranchDto branch, @Nullable UserDto userDto) {
  459. assertThat(task.getUuid()).isEqualTo(taskSubmit.getUuid());
  460. if (branch != null) {
  461. CeTask.Component component = task.getComponent().get();
  462. assertThat(component.getUuid()).isEqualTo(branch.getUuid());
  463. assertThat(component.getKey()).contains(entityDto.getKey());
  464. assertThat(component.getName()).contains(entityDto.getName());
  465. } else if (taskSubmit.getComponent().isPresent()) {
  466. assertThat(task.getComponent()).contains(new CeTask.Component(taskSubmit.getComponent().get().getUuid(), null, null));
  467. } else {
  468. assertThat(task.getComponent()).isEmpty();
  469. }
  470. if (entityDto != null) {
  471. CeTask.Component component = task.getEntity().get();
  472. assertThat(component.getUuid()).isEqualTo(entityDto.getUuid());
  473. assertThat(component.getKey()).contains(entityDto.getKey());
  474. assertThat(component.getName()).contains(entityDto.getName());
  475. } else if (taskSubmit.getComponent().isPresent()) {
  476. assertThat(task.getEntity()).contains(new CeTask.Component(taskSubmit.getComponent().get().getEntityUuid(), null, null));
  477. } else {
  478. assertThat(task.getEntity()).isEmpty();
  479. }
  480. assertThat(task.getType()).isEqualTo(taskSubmit.getType());
  481. if (taskSubmit.getSubmitterUuid() != null) {
  482. if (userDto == null) {
  483. assertThat(task.getSubmitter().uuid()).isEqualTo(taskSubmit.getSubmitterUuid());
  484. assertThat(task.getSubmitter().login()).isNull();
  485. } else {
  486. assertThat(task.getSubmitter().uuid()).isEqualTo(userDto.getUuid()).isEqualTo(taskSubmit.getSubmitterUuid());
  487. assertThat(task.getSubmitter().login()).isEqualTo(userDto.getLogin());
  488. }
  489. }
  490. }
  491. private void verifyCeQueueDtoForTaskSubmit(CeTaskSubmit taskSubmit) {
  492. Optional<CeQueueDto> queueDto = db.getDbClient().ceQueueDao().selectByUuid(db.getSession(), taskSubmit.getUuid());
  493. assertThat(queueDto).isPresent();
  494. assertThat(queueDto.get().getTaskType()).isEqualTo(taskSubmit.getType());
  495. Optional<Component> component = taskSubmit.getComponent();
  496. if (component.isPresent()) {
  497. assertThat(queueDto.get().getComponentUuid()).isEqualTo(component.get().getUuid());
  498. assertThat(queueDto.get().getEntityUuid()).isEqualTo(component.get().getEntityUuid());
  499. } else {
  500. assertThat(queueDto.get().getComponentUuid()).isNull();
  501. assertThat(queueDto.get().getComponentUuid()).isNull();
  502. }
  503. assertThat(queueDto.get().getSubmitterUuid()).isEqualTo(taskSubmit.getSubmitterUuid());
  504. assertThat(queueDto.get().getCreatedAt()).isEqualTo(1_450_000_000_000L);
  505. }
  506. private CeTask submit(String reportType, Component component) {
  507. return underTest.submit(createTaskSubmit(reportType, component, null));
  508. }
  509. private CeTaskSubmit createTaskSubmit(String type) {
  510. return createTaskSubmit(type, null, null);
  511. }
  512. private CeTaskSubmit createTaskSubmit(String type, @Nullable Component component, @Nullable String submitterUuid) {
  513. return underTest.prepareSubmit()
  514. .setType(type)
  515. .setComponent(component)
  516. .setSubmitterUuid(submitterUuid)
  517. .setCharacteristics(emptyMap())
  518. .build();
  519. }
  520. private ComponentDto insertComponent(ComponentDto componentDto) {
  521. return db.components().insertComponent(componentDto);
  522. }
  523. private UserDto insertUser(UserDto userDto) {
  524. db.getDbClient().userDao().insert(session, userDto);
  525. session.commit();
  526. return userDto;
  527. }
  528. private CeQueueDto insertPendingInQueue(@Nullable Component component) {
  529. CeQueueDto dto = new CeQueueDto()
  530. .setUuid(UuidFactoryFast.getInstance().create())
  531. .setTaskType("some type")
  532. .setStatus(CeQueueDto.Status.PENDING);
  533. if (component != null) {
  534. dto.setComponentUuid(component.getUuid())
  535. .setEntityUuid(component.getEntityUuid());
  536. }
  537. db.getDbClient().ceQueueDao().insert(db.getSession(), dto);
  538. db.commit();
  539. return dto;
  540. }
  541. private static int newComponentIdGenerator = new Random().nextInt(8_999_333);
  542. private static Component newComponent(String mainComponentUuid) {
  543. return new Component("uuid_" + newComponentIdGenerator++, mainComponentUuid);
  544. }
  545. }