You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

CeQueueImplTest.java 27KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601
  1. /*
  2. * SonarQube
  3. * Copyright (C) 2009-2021 SonarSource SA
  4. * mailto:info AT sonarsource DOT com
  5. *
  6. * This program is free software; you can redistribute it and/or
  7. * modify it under the terms of the GNU Lesser General Public
  8. * License as published by the Free Software Foundation; either
  9. * version 3 of the License, or (at your option) any later version.
  10. *
  11. * This program is distributed in the hope that it will be useful,
  12. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  14. * Lesser General Public License for more details.
  15. *
  16. * You should have received a copy of the GNU Lesser General Public License
  17. * along with this program; if not, write to the Free Software Foundation,
  18. * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
  19. */
  20. package org.sonar.ce.queue;
  21. import java.util.List;
  22. import java.util.Optional;
  23. import java.util.Random;
  24. import java.util.stream.Collectors;
  25. import java.util.stream.IntStream;
  26. import javax.annotation.Nullable;
  27. import org.junit.Rule;
  28. import org.junit.Test;
  29. import org.sonar.api.impl.utils.TestSystem2;
  30. import org.sonar.api.utils.System2;
  31. import org.sonar.ce.queue.CeTaskSubmit.Component;
  32. import org.sonar.ce.task.CeTask;
  33. import org.sonar.core.util.SequenceUuidFactory;
  34. import org.sonar.core.util.UuidFactory;
  35. import org.sonar.core.util.UuidFactoryFast;
  36. import org.sonar.db.DbSession;
  37. import org.sonar.db.DbTester;
  38. import org.sonar.db.ce.CeActivityDto;
  39. import org.sonar.db.ce.CeQueueDto;
  40. import org.sonar.db.ce.CeTaskTypes;
  41. import org.sonar.db.component.ComponentDto;
  42. import org.sonar.db.component.ComponentTesting;
  43. import org.sonar.db.user.UserDto;
  44. import org.sonar.db.user.UserTesting;
  45. import static com.google.common.collect.ImmutableList.of;
  46. import static java.util.Arrays.asList;
  47. import static java.util.Collections.emptyMap;
  48. import static org.apache.commons.lang.RandomStringUtils.randomAlphabetic;
  49. import static org.assertj.core.api.Assertions.assertThat;
  50. import static org.assertj.core.api.Assertions.assertThatThrownBy;
  51. import static org.assertj.core.api.Assertions.catchThrowable;
  52. import static org.assertj.core.api.Assertions.tuple;
  53. import static org.sonar.ce.queue.CeQueue.SubmitOption.UNIQUE_QUEUE_PER_MAIN_COMPONENT;
  54. public class CeQueueImplTest {
  55. private static final String WORKER_UUID = "workerUuid";
  56. private static final long NOW = 1_450_000_000_000L;
  57. private System2 system2 = new TestSystem2().setNow(NOW);
  58. @Rule
  59. public DbTester db = DbTester.create(system2);
  60. private DbSession session = db.getSession();
  61. private UuidFactory uuidFactory = new SequenceUuidFactory();
  62. private CeQueue underTest = new CeQueueImpl(system2, db.getDbClient(), uuidFactory);
  63. @Test
  64. public void submit_returns_task_populated_from_CeTaskSubmit_and_creates_CeQueue_row() {
  65. String componentUuid = randomAlphabetic(3);
  66. String mainComponentUuid = randomAlphabetic(4);
  67. CeTaskSubmit taskSubmit = createTaskSubmit(CeTaskTypes.REPORT, new Component(componentUuid, mainComponentUuid), "submitter uuid");
  68. UserDto userDto = db.getDbClient().userDao().selectByUuid(db.getSession(), taskSubmit.getSubmitterUuid());
  69. CeTask task = underTest.submit(taskSubmit);
  70. verifyCeTask(taskSubmit, task, null, userDto);
  71. verifyCeQueueDtoForTaskSubmit(taskSubmit);
  72. }
  73. @Test
  74. public void submit_populates_component_name_and_key_of_CeTask_if_component_exists() {
  75. ComponentDto componentDto = insertComponent(ComponentTesting.newPrivateProjectDto("PROJECT_1"));
  76. CeTaskSubmit taskSubmit = createTaskSubmit(CeTaskTypes.REPORT, Component.fromDto(componentDto), null);
  77. CeTask task = underTest.submit(taskSubmit);
  78. verifyCeTask(taskSubmit, task, componentDto, null);
  79. }
  80. @Test
  81. public void submit_returns_task_without_component_info_when_submit_has_none() {
  82. CeTaskSubmit taskSubmit = createTaskSubmit("not cpt related");
  83. CeTask task = underTest.submit(taskSubmit);
  84. verifyCeTask(taskSubmit, task, null, null);
  85. }
  86. @Test
  87. public void submit_populates_submitter_login_of_CeTask_if_submitter_exists() {
  88. UserDto userDto = insertUser(UserTesting.newUserDto());
  89. CeTaskSubmit taskSubmit = createTaskSubmit(CeTaskTypes.REPORT, null, userDto.getUuid());
  90. CeTask task = underTest.submit(taskSubmit);
  91. verifyCeTask(taskSubmit, task, null, userDto);
  92. }
  93. @Test
  94. public void submit_with_UNIQUE_QUEUE_PER_MAIN_COMPONENT_creates_task_without_component_when_there_is_a_pending_task_without_component() {
  95. CeTaskSubmit taskSubmit = createTaskSubmit("no_component");
  96. CeQueueDto dto = insertPendingInQueue(null);
  97. Optional<CeTask> task = underTest.submit(taskSubmit, UNIQUE_QUEUE_PER_MAIN_COMPONENT);
  98. assertThat(task).isNotEmpty();
  99. assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession()))
  100. .extracting(CeQueueDto::getUuid)
  101. .containsOnly(dto.getUuid(), task.get().getUuid());
  102. }
  103. @Test
  104. public void submit_with_UNIQUE_QUEUE_PER_MAIN_COMPONENT_creates_task_when_there_is_a_pending_task_for_another_main_component() {
  105. String mainComponentUuid = randomAlphabetic(5);
  106. String otherMainComponentUuid = randomAlphabetic(6);
  107. CeTaskSubmit taskSubmit = createTaskSubmit("with_component", newComponent(mainComponentUuid), null);
  108. CeQueueDto dto = insertPendingInQueue(newComponent(otherMainComponentUuid));
  109. Optional<CeTask> task = underTest.submit(taskSubmit, UNIQUE_QUEUE_PER_MAIN_COMPONENT);
  110. assertThat(task).isNotEmpty();
  111. assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession()))
  112. .extracting(CeQueueDto::getUuid)
  113. .containsOnly(dto.getUuid(), task.get().getUuid());
  114. }
  115. @Test
  116. public void submit_with_UNIQUE_QUEUE_PER_MAIN_COMPONENT_does_not_create_task_when_there_is_one_pending_task_for_same_main_component() {
  117. String mainComponentUuid = randomAlphabetic(5);
  118. CeTaskSubmit taskSubmit = createTaskSubmit("with_component", newComponent(mainComponentUuid), null);
  119. CeQueueDto dto = insertPendingInQueue(newComponent(mainComponentUuid));
  120. Optional<CeTask> task = underTest.submit(taskSubmit, UNIQUE_QUEUE_PER_MAIN_COMPONENT);
  121. assertThat(task).isEmpty();
  122. assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession()))
  123. .extracting(CeQueueDto::getUuid)
  124. .containsOnly(dto.getUuid());
  125. }
  126. @Test
  127. public void submit_with_UNIQUE_QUEUE_PER_MAIN_COMPONENT_does_not_create_task_when_there_is_many_pending_task_for_same_main_component() {
  128. String mainComponentUuid = randomAlphabetic(5);
  129. CeTaskSubmit taskSubmit = createTaskSubmit("with_component", newComponent(mainComponentUuid), null);
  130. String[] uuids = IntStream.range(0, 2 + new Random().nextInt(5))
  131. .mapToObj(i -> insertPendingInQueue(newComponent(mainComponentUuid)))
  132. .map(CeQueueDto::getUuid)
  133. .toArray(String[]::new);
  134. Optional<CeTask> task = underTest.submit(taskSubmit, UNIQUE_QUEUE_PER_MAIN_COMPONENT);
  135. assertThat(task).isEmpty();
  136. assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession()))
  137. .extracting(CeQueueDto::getUuid)
  138. .containsOnly(uuids);
  139. }
  140. @Test
  141. public void submit_without_UNIQUE_QUEUE_PER_MAIN_COMPONENT_creates_task_when_there_is_one_pending_task_for_same_main_component() {
  142. String mainComponentUuid = randomAlphabetic(5);
  143. CeTaskSubmit taskSubmit = createTaskSubmit("with_component", newComponent(mainComponentUuid), null);
  144. CeQueueDto dto = insertPendingInQueue(newComponent(mainComponentUuid));
  145. CeTask task = underTest.submit(taskSubmit);
  146. assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession()))
  147. .extracting(CeQueueDto::getUuid)
  148. .containsOnly(dto.getUuid(), task.getUuid());
  149. }
  150. @Test
  151. public void submit_without_UNIQUE_QUEUE_PER_MAIN_COMPONENT_creates_task_when_there_is_many_pending_task_for_same_main_component() {
  152. String mainComponentUuid = randomAlphabetic(5);
  153. CeTaskSubmit taskSubmit = createTaskSubmit("with_component", newComponent(mainComponentUuid), null);
  154. String[] uuids = IntStream.range(0, 2 + new Random().nextInt(5))
  155. .mapToObj(i -> insertPendingInQueue(newComponent(mainComponentUuid)))
  156. .map(CeQueueDto::getUuid)
  157. .toArray(String[]::new);
  158. CeTask task = underTest.submit(taskSubmit);
  159. assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession()))
  160. .extracting(CeQueueDto::getUuid)
  161. .hasSize(uuids.length + 1)
  162. .contains(uuids)
  163. .contains(task.getUuid());
  164. }
  165. @Test
  166. public void massSubmit_returns_tasks_for_each_CeTaskSubmit_populated_from_CeTaskSubmit_and_creates_CeQueue_row_for_each() {
  167. String mainComponentUuid = randomAlphabetic(10);
  168. CeTaskSubmit taskSubmit1 = createTaskSubmit(CeTaskTypes.REPORT, newComponent(mainComponentUuid), "submitter uuid");
  169. CeTaskSubmit taskSubmit2 = createTaskSubmit("some type");
  170. UserDto userDto1 = db.getDbClient().userDao().selectByUuid(db.getSession(), taskSubmit1.getSubmitterUuid());
  171. List<CeTask> tasks = underTest.massSubmit(asList(taskSubmit1, taskSubmit2));
  172. assertThat(tasks).hasSize(2);
  173. verifyCeTask(taskSubmit1, tasks.get(0), null, userDto1);
  174. verifyCeTask(taskSubmit2, tasks.get(1), null, null);
  175. verifyCeQueueDtoForTaskSubmit(taskSubmit1);
  176. verifyCeQueueDtoForTaskSubmit(taskSubmit2);
  177. }
  178. @Test
  179. public void massSubmit_populates_component_name_and_key_of_CeTask_if_project_exists() {
  180. ComponentDto componentDto1 = insertComponent(ComponentTesting.newPrivateProjectDto("PROJECT_1"));
  181. CeTaskSubmit taskSubmit1 = createTaskSubmit(CeTaskTypes.REPORT, Component.fromDto(componentDto1), null);
  182. CeTaskSubmit taskSubmit2 = createTaskSubmit("something", newComponent(randomAlphabetic(12)), null);
  183. List<CeTask> tasks = underTest.massSubmit(asList(taskSubmit1, taskSubmit2));
  184. assertThat(tasks).hasSize(2);
  185. verifyCeTask(taskSubmit1, tasks.get(0), componentDto1, null);
  186. verifyCeTask(taskSubmit2, tasks.get(1), null, null);
  187. }
  188. @Test
  189. public void massSubmit_populates_component_name_and_key_of_CeTask_if_project_and_branch_exists() {
  190. ComponentDto project = insertComponent(ComponentTesting.newPrivateProjectDto("PROJECT_1"));
  191. ComponentDto branch1 = db.components().insertProjectBranch(project);
  192. ComponentDto branch2 = db.components().insertProjectBranch(project);
  193. CeTaskSubmit taskSubmit1 = createTaskSubmit(CeTaskTypes.REPORT, Component.fromDto(branch1), null);
  194. CeTaskSubmit taskSubmit2 = createTaskSubmit("something", Component.fromDto(branch2), null);
  195. List<CeTask> tasks = underTest.massSubmit(asList(taskSubmit1, taskSubmit2));
  196. assertThat(tasks).hasSize(2);
  197. verifyCeTask(taskSubmit1, tasks.get(0), branch1, project, null);
  198. verifyCeTask(taskSubmit2, tasks.get(1), branch2, project, null);
  199. }
  200. @Test
  201. public void massSubmit_with_UNIQUE_QUEUE_PER_MAIN_COMPONENT_creates_task_without_component_when_there_is_a_pending_task_without_component() {
  202. CeTaskSubmit taskSubmit = createTaskSubmit("no_component");
  203. CeQueueDto dto = insertPendingInQueue(null);
  204. List<CeTask> tasks = underTest.massSubmit(of(taskSubmit), UNIQUE_QUEUE_PER_MAIN_COMPONENT);
  205. assertThat(tasks).hasSize(1);
  206. assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession()))
  207. .extracting(CeQueueDto::getUuid)
  208. .containsOnly(dto.getUuid(), tasks.iterator().next().getUuid());
  209. }
  210. @Test
  211. public void massSubmit_with_UNIQUE_QUEUE_PER_MAIN_COMPONENT_creates_task_when_there_is_a_pending_task_for_another_main_component() {
  212. String mainComponentUuid = randomAlphabetic(5);
  213. String otherMainComponentUuid = randomAlphabetic(6);
  214. CeTaskSubmit taskSubmit = createTaskSubmit("with_component", newComponent(mainComponentUuid), null);
  215. CeQueueDto dto = insertPendingInQueue(newComponent(otherMainComponentUuid));
  216. List<CeTask> tasks = underTest.massSubmit(of(taskSubmit), UNIQUE_QUEUE_PER_MAIN_COMPONENT);
  217. assertThat(tasks).hasSize(1);
  218. assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession()))
  219. .extracting(CeQueueDto::getUuid)
  220. .containsOnly(dto.getUuid(), tasks.iterator().next().getUuid());
  221. }
  222. @Test
  223. public void massSubmit_with_UNIQUE_QUEUE_PER_MAIN_COMPONENT_does_not_create_task_when_there_is_one_pending_task_for_same_main_component() {
  224. String mainComponentUuid = randomAlphabetic(5);
  225. CeTaskSubmit taskSubmit = createTaskSubmit("with_component", newComponent(mainComponentUuid), null);
  226. CeQueueDto dto = insertPendingInQueue(newComponent(mainComponentUuid));
  227. List<CeTask> tasks = underTest.massSubmit(of(taskSubmit), UNIQUE_QUEUE_PER_MAIN_COMPONENT);
  228. assertThat(tasks).isEmpty();
  229. assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession()))
  230. .extracting(CeQueueDto::getUuid)
  231. .containsOnly(dto.getUuid());
  232. }
  233. @Test
  234. public void massSubmit_with_UNIQUE_QUEUE_PER_MAIN_COMPONENT_does_not_create_task_when_there_is_many_pending_task_for_same_main_component() {
  235. String mainComponentUuid = randomAlphabetic(5);
  236. CeTaskSubmit taskSubmit = createTaskSubmit("with_component", newComponent(mainComponentUuid), null);
  237. String[] uuids = IntStream.range(0, 2 + new Random().nextInt(5))
  238. .mapToObj(i -> insertPendingInQueue(newComponent(mainComponentUuid)))
  239. .map(CeQueueDto::getUuid)
  240. .toArray(String[]::new);
  241. List<CeTask> tasks = underTest.massSubmit(of(taskSubmit), UNIQUE_QUEUE_PER_MAIN_COMPONENT);
  242. assertThat(tasks).isEmpty();
  243. assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession()))
  244. .extracting(CeQueueDto::getUuid)
  245. .containsOnly(uuids);
  246. }
  247. @Test
  248. public void massSubmit_without_UNIQUE_QUEUE_PER_MAIN_COMPONENT_creates_task_when_there_is_one_pending_task_for_other_main_component() {
  249. String mainComponentUuid = randomAlphabetic(5);
  250. CeTaskSubmit taskSubmit = createTaskSubmit("with_component", newComponent(mainComponentUuid), null);
  251. CeQueueDto dto = insertPendingInQueue(newComponent(mainComponentUuid));
  252. List<CeTask> tasks = underTest.massSubmit(of(taskSubmit));
  253. assertThat(tasks).hasSize(1);
  254. assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession()))
  255. .extracting(CeQueueDto::getUuid)
  256. .containsOnly(dto.getUuid(), tasks.iterator().next().getUuid());
  257. }
  258. @Test
  259. public void massSubmit_without_UNIQUE_QUEUE_PER_MAIN_COMPONENT_creates_task_when_there_is_many_pending_task_for_other_main_component() {
  260. String mainComponentUuid = randomAlphabetic(5);
  261. CeTaskSubmit taskSubmit = createTaskSubmit("with_component", newComponent(mainComponentUuid), null);
  262. String[] uuids = IntStream.range(0, 2 + new Random().nextInt(5))
  263. .mapToObj(i -> insertPendingInQueue(newComponent(mainComponentUuid)))
  264. .map(CeQueueDto::getUuid)
  265. .toArray(String[]::new);
  266. List<CeTask> tasks = underTest.massSubmit(of(taskSubmit));
  267. assertThat(tasks).hasSize(1);
  268. assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession()))
  269. .extracting(CeQueueDto::getUuid)
  270. .hasSize(uuids.length + 1)
  271. .contains(uuids)
  272. .contains(tasks.iterator().next().getUuid());
  273. }
  274. @Test
  275. public void massSubmit_with_UNIQUE_QUEUE_PER_MAIN_COMPONENT_creates_tasks_depending_on_whether_there_is_pending_task_for_same_main_component() {
  276. String mainComponentUuid1 = randomAlphabetic(5);
  277. String mainComponentUuid2 = randomAlphabetic(6);
  278. String mainComponentUuid3 = randomAlphabetic(7);
  279. String mainComponentUuid4 = randomAlphabetic(8);
  280. String mainComponentUuid5 = randomAlphabetic(9);
  281. CeTaskSubmit taskSubmit1 = createTaskSubmit("with_one_pending", newComponent(mainComponentUuid1), null);
  282. CeQueueDto dto1 = insertPendingInQueue(newComponent(mainComponentUuid1));
  283. Component componentForMainComponentUuid2 = newComponent(mainComponentUuid2);
  284. CeTaskSubmit taskSubmit2 = createTaskSubmit("no_pending", componentForMainComponentUuid2, null);
  285. CeTaskSubmit taskSubmit3 = createTaskSubmit("with_many_pending", newComponent(mainComponentUuid3), null);
  286. String[] uuids3 = IntStream.range(0, 2 + new Random().nextInt(5))
  287. .mapToObj(i -> insertPendingInQueue(newComponent(mainComponentUuid3)))
  288. .map(CeQueueDto::getUuid)
  289. .toArray(String[]::new);
  290. Component componentForMainComponentUuid4 = newComponent(mainComponentUuid4);
  291. CeTaskSubmit taskSubmit4 = createTaskSubmit("no_pending_2", componentForMainComponentUuid4, null);
  292. CeTaskSubmit taskSubmit5 = createTaskSubmit("with_pending_2", newComponent(mainComponentUuid5), null);
  293. CeQueueDto dto5 = insertPendingInQueue(newComponent(mainComponentUuid5));
  294. List<CeTask> tasks = underTest.massSubmit(of(taskSubmit1, taskSubmit2, taskSubmit3, taskSubmit4, taskSubmit5), UNIQUE_QUEUE_PER_MAIN_COMPONENT);
  295. assertThat(tasks)
  296. .hasSize(2)
  297. .extracting(task -> task.getComponent().get().getUuid(), task -> task.getMainComponent().get().getUuid())
  298. .containsOnly(tuple(componentForMainComponentUuid2.getUuid(), componentForMainComponentUuid2.getMainComponentUuid()),
  299. tuple(componentForMainComponentUuid4.getUuid(), componentForMainComponentUuid4.getMainComponentUuid()));
  300. assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession()))
  301. .extracting(CeQueueDto::getUuid)
  302. .hasSize(1 + uuids3.length + 1 + tasks.size())
  303. .contains(dto1.getUuid())
  304. .contains(uuids3)
  305. .contains(dto5.getUuid())
  306. .containsAll(tasks.stream().map(CeTask::getUuid).collect(Collectors.toList()));
  307. }
  308. @Test
  309. public void cancel_pending() {
  310. CeTask task = submit(CeTaskTypes.REPORT, newComponent(randomAlphabetic(12)));
  311. CeQueueDto queueDto = db.getDbClient().ceQueueDao().selectByUuid(db.getSession(), task.getUuid()).get();
  312. underTest.cancel(db.getSession(), queueDto);
  313. Optional<CeActivityDto> activity = db.getDbClient().ceActivityDao().selectByUuid(db.getSession(), task.getUuid());
  314. assertThat(activity).isPresent();
  315. assertThat(activity.get().getStatus()).isEqualTo(CeActivityDto.Status.CANCELED);
  316. }
  317. @Test
  318. public void fail_to_cancel_if_in_progress() {
  319. submit(CeTaskTypes.REPORT, newComponent(randomAlphabetic(11)));
  320. CeQueueDto ceQueueDto = db.getDbClient().ceQueueDao().peek(session, WORKER_UUID, false, false).get();
  321. assertThatThrownBy(() -> underTest.cancel(db.getSession(), ceQueueDto))
  322. .isInstanceOf(IllegalStateException.class)
  323. .hasMessageStartingWith("Task is in progress and can't be canceled");
  324. }
  325. @Test
  326. public void cancelAll_pendings_but_not_in_progress() {
  327. CeTask inProgressTask = submit(CeTaskTypes.REPORT, newComponent(randomAlphabetic(12)));
  328. CeTask pendingTask1 = submit(CeTaskTypes.REPORT, newComponent(randomAlphabetic(12)));
  329. CeTask pendingTask2 = submit(CeTaskTypes.REPORT, newComponent(randomAlphabetic(12)));
  330. db.getDbClient().ceQueueDao().peek(session, WORKER_UUID, false, false);
  331. int canceledCount = underTest.cancelAll();
  332. assertThat(canceledCount).isEqualTo(2);
  333. Optional<CeActivityDto> ceActivityInProgress = db.getDbClient().ceActivityDao().selectByUuid(db.getSession(), pendingTask1.getUuid());
  334. assertThat(ceActivityInProgress.get().getStatus()).isEqualTo(CeActivityDto.Status.CANCELED);
  335. Optional<CeActivityDto> ceActivityPending1 = db.getDbClient().ceActivityDao().selectByUuid(db.getSession(), pendingTask2.getUuid());
  336. assertThat(ceActivityPending1.get().getStatus()).isEqualTo(CeActivityDto.Status.CANCELED);
  337. Optional<CeActivityDto> ceActivityPending2 = db.getDbClient().ceActivityDao().selectByUuid(db.getSession(), inProgressTask.getUuid());
  338. assertThat(ceActivityPending2).isNotPresent();
  339. }
  340. @Test
  341. public void pauseWorkers_marks_workers_as_paused_if_zero_tasks_in_progress() {
  342. submit(CeTaskTypes.REPORT, newComponent(randomAlphabetic(12)));
  343. // task is pending
  344. assertThat(underTest.getWorkersPauseStatus()).isEqualTo(CeQueue.WorkersPauseStatus.RESUMED);
  345. underTest.pauseWorkers();
  346. assertThat(underTest.getWorkersPauseStatus()).isEqualTo(CeQueue.WorkersPauseStatus.PAUSED);
  347. }
  348. @Test
  349. public void pauseWorkers_marks_workers_as_pausing_if_some_tasks_in_progress() {
  350. submit(CeTaskTypes.REPORT, newComponent(randomAlphabetic(12)));
  351. db.getDbClient().ceQueueDao().peek(session, WORKER_UUID, false, false);
  352. // task is in-progress
  353. assertThat(underTest.getWorkersPauseStatus()).isEqualTo(CeQueue.WorkersPauseStatus.RESUMED);
  354. underTest.pauseWorkers();
  355. assertThat(underTest.getWorkersPauseStatus()).isEqualTo(CeQueue.WorkersPauseStatus.PAUSING);
  356. }
  357. @Test
  358. public void resumeWorkers_does_nothing_if_not_paused() {
  359. assertThat(underTest.getWorkersPauseStatus()).isEqualTo(CeQueue.WorkersPauseStatus.RESUMED);
  360. underTest.resumeWorkers();
  361. assertThat(underTest.getWorkersPauseStatus()).isEqualTo(CeQueue.WorkersPauseStatus.RESUMED);
  362. }
  363. @Test
  364. public void resumeWorkers_resumes_pausing_workers() {
  365. submit(CeTaskTypes.REPORT, newComponent(randomAlphabetic(12)));
  366. db.getDbClient().ceQueueDao().peek(session, WORKER_UUID, false, false);
  367. // task is in-progress
  368. underTest.pauseWorkers();
  369. assertThat(underTest.getWorkersPauseStatus()).isEqualTo(CeQueue.WorkersPauseStatus.PAUSING);
  370. underTest.resumeWorkers();
  371. assertThat(underTest.getWorkersPauseStatus()).isEqualTo(CeQueue.WorkersPauseStatus.RESUMED);
  372. }
  373. @Test
  374. public void resumeWorkers_resumes_paused_workers() {
  375. underTest.pauseWorkers();
  376. assertThat(underTest.getWorkersPauseStatus()).isEqualTo(CeQueue.WorkersPauseStatus.PAUSED);
  377. underTest.resumeWorkers();
  378. assertThat(underTest.getWorkersPauseStatus()).isEqualTo(CeQueue.WorkersPauseStatus.RESUMED);
  379. }
  380. @Test
  381. public void fail_in_progress_task() {
  382. CeTask task = submit(CeTaskTypes.REPORT, newComponent(randomAlphabetic(12)));
  383. CeQueueDto queueDto = db.getDbClient().ceQueueDao().peek(db.getSession(), WORKER_UUID, false, false).get();
  384. underTest.fail(db.getSession(), queueDto, "TIMEOUT", "Failed on timeout");
  385. Optional<CeActivityDto> activity = db.getDbClient().ceActivityDao().selectByUuid(db.getSession(), task.getUuid());
  386. assertThat(activity).isPresent();
  387. assertThat(activity.get().getStatus()).isEqualTo(CeActivityDto.Status.FAILED);
  388. assertThat(activity.get().getErrorType()).isEqualTo("TIMEOUT");
  389. assertThat(activity.get().getErrorMessage()).isEqualTo("Failed on timeout");
  390. assertThat(activity.get().getExecutedAt()).isEqualTo(NOW);
  391. assertThat(activity.get().getWorkerUuid()).isEqualTo(WORKER_UUID);
  392. }
  393. @Test
  394. public void fail_throws_exception_if_task_is_pending() {
  395. CeTask task = submit(CeTaskTypes.REPORT, newComponent(randomAlphabetic(12)));
  396. CeQueueDto queueDto = db.getDbClient().ceQueueDao().selectByUuid(db.getSession(), task.getUuid()).get();
  397. Throwable thrown = catchThrowable(() -> underTest.fail(db.getSession(), queueDto, "TIMEOUT", "Failed on timeout"));
  398. assertThat(thrown)
  399. .isInstanceOf(IllegalStateException.class)
  400. .hasMessage("Task is not in-progress and can't be marked as failed [uuid=" + task.getUuid() + "]");
  401. }
  402. private void verifyCeTask(CeTaskSubmit taskSubmit, CeTask task, @Nullable ComponentDto componentDto, UserDto userDto) {
  403. verifyCeTask(taskSubmit, task, componentDto, componentDto, userDto);
  404. }
  405. private void verifyCeTask(CeTaskSubmit taskSubmit, CeTask task, @Nullable ComponentDto componentDto, @Nullable ComponentDto mainComponentDto, @Nullable UserDto userDto) {
  406. assertThat(task.getUuid()).isEqualTo(taskSubmit.getUuid());
  407. if (componentDto != null) {
  408. CeTask.Component component = task.getComponent().get();
  409. assertThat(component.getUuid()).isEqualTo(componentDto.uuid());
  410. assertThat(component.getKey()).contains(componentDto.getDbKey());
  411. assertThat(component.getName()).contains(componentDto.name());
  412. } else if (taskSubmit.getComponent().isPresent()) {
  413. assertThat(task.getComponent()).contains(new CeTask.Component(taskSubmit.getComponent().get().getUuid(), null, null));
  414. } else {
  415. assertThat(task.getComponent()).isEmpty();
  416. }
  417. if (mainComponentDto != null) {
  418. CeTask.Component component = task.getMainComponent().get();
  419. assertThat(component.getUuid()).isEqualTo(mainComponentDto.uuid());
  420. assertThat(component.getKey()).contains(mainComponentDto.getDbKey());
  421. assertThat(component.getName()).contains(mainComponentDto.name());
  422. } else if (taskSubmit.getComponent().isPresent()) {
  423. assertThat(task.getMainComponent()).contains(new CeTask.Component(taskSubmit.getComponent().get().getMainComponentUuid(), null, null));
  424. } else {
  425. assertThat(task.getMainComponent()).isEmpty();
  426. }
  427. assertThat(task.getType()).isEqualTo(taskSubmit.getType());
  428. if (taskSubmit.getSubmitterUuid() != null) {
  429. if (userDto == null) {
  430. assertThat(task.getSubmitter().getUuid()).isEqualTo(taskSubmit.getSubmitterUuid());
  431. assertThat(task.getSubmitter().getLogin()).isNull();
  432. } else {
  433. assertThat(task.getSubmitter().getUuid()).isEqualTo(userDto.getUuid()).isEqualTo(taskSubmit.getSubmitterUuid());
  434. assertThat(task.getSubmitter().getLogin()).isEqualTo(userDto.getLogin());
  435. }
  436. }
  437. }
  438. private void verifyCeQueueDtoForTaskSubmit(CeTaskSubmit taskSubmit) {
  439. Optional<CeQueueDto> queueDto = db.getDbClient().ceQueueDao().selectByUuid(db.getSession(), taskSubmit.getUuid());
  440. assertThat(queueDto).isPresent();
  441. assertThat(queueDto.get().getTaskType()).isEqualTo(taskSubmit.getType());
  442. Optional<Component> component = taskSubmit.getComponent();
  443. if (component.isPresent()) {
  444. assertThat(queueDto.get().getComponentUuid()).isEqualTo(component.get().getUuid());
  445. assertThat(queueDto.get().getMainComponentUuid()).isEqualTo(component.get().getMainComponentUuid());
  446. } else {
  447. assertThat(queueDto.get().getComponentUuid()).isNull();
  448. assertThat(queueDto.get().getComponentUuid()).isNull();
  449. }
  450. assertThat(queueDto.get().getSubmitterUuid()).isEqualTo(taskSubmit.getSubmitterUuid());
  451. assertThat(queueDto.get().getCreatedAt()).isEqualTo(1_450_000_000_000L);
  452. }
  453. private CeTask submit(String reportType, Component component) {
  454. return underTest.submit(createTaskSubmit(reportType, component, null));
  455. }
  456. private CeTaskSubmit createTaskSubmit(String type) {
  457. return createTaskSubmit(type, null, null);
  458. }
  459. private CeTaskSubmit createTaskSubmit(String type, @Nullable Component component, @Nullable String submitterUuid) {
  460. return underTest.prepareSubmit()
  461. .setType(type)
  462. .setComponent(component)
  463. .setSubmitterUuid(submitterUuid)
  464. .setCharacteristics(emptyMap())
  465. .build();
  466. }
  467. private ComponentDto insertComponent(ComponentDto componentDto) {
  468. return db.components().insertComponent(componentDto);
  469. }
  470. private UserDto insertUser(UserDto userDto) {
  471. db.getDbClient().userDao().insert(session, userDto);
  472. session.commit();
  473. return userDto;
  474. }
  475. private CeQueueDto insertPendingInQueue(@Nullable Component component) {
  476. CeQueueDto dto = new CeQueueDto()
  477. .setUuid(UuidFactoryFast.getInstance().create())
  478. .setTaskType("some type")
  479. .setStatus(CeQueueDto.Status.PENDING);
  480. if (component != null) {
  481. dto.setComponentUuid(component.getUuid())
  482. .setMainComponentUuid(component.getMainComponentUuid());
  483. }
  484. db.getDbClient().ceQueueDao().insert(db.getSession(), dto);
  485. db.commit();
  486. return dto;
  487. }
  488. private static int newComponentIdGenerator = new Random().nextInt(8_999_333);
  489. private static Component newComponent(String mainComponentUuid) {
  490. return new Component("uuid_" + newComponentIdGenerator++, mainComponentUuid);
  491. }
  492. }