You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

InternalCeQueueImplTest.java 28KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689
  1. /*
  2. * SonarQube
  3. * Copyright (C) 2009-2021 SonarSource SA
  4. * mailto:info AT sonarsource DOT com
  5. *
  6. * This program is free software; you can redistribute it and/or
  7. * modify it under the terms of the GNU Lesser General Public
  8. * License as published by the Free Software Foundation; either
  9. * version 3 of the License, or (at your option) any later version.
  10. *
  11. * This program is distributed in the hope that it will be useful,
  12. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  14. * Lesser General Public License for more details.
  15. *
  16. * You should have received a copy of the GNU Lesser General Public License
  17. * along with this program; if not, write to the Free Software Foundation,
  18. * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
  19. */
  20. package org.sonar.ce.queue;
  21. import com.google.common.collect.ImmutableSet;
  22. import java.io.ByteArrayOutputStream;
  23. import java.io.PrintStream;
  24. import java.util.List;
  25. import java.util.Optional;
  26. import javax.annotation.Nullable;
  27. import org.junit.Before;
  28. import org.junit.Rule;
  29. import org.junit.Test;
  30. import org.sonar.api.impl.utils.AlwaysIncreasingSystem2;
  31. import org.sonar.api.utils.System2;
  32. import org.sonar.ce.container.ComputeEngineStatus;
  33. import org.sonar.ce.monitoring.CEQueueStatus;
  34. import org.sonar.ce.monitoring.CEQueueStatusImpl;
  35. import org.sonar.ce.task.CeTask;
  36. import org.sonar.ce.task.CeTaskResult;
  37. import org.sonar.ce.task.TypedException;
  38. import org.sonar.core.util.UuidFactory;
  39. import org.sonar.core.util.UuidFactoryImpl;
  40. import org.sonar.db.DbSession;
  41. import org.sonar.db.DbTester;
  42. import org.sonar.db.ce.CeActivityDto;
  43. import org.sonar.db.ce.CeQueueDto;
  44. import org.sonar.db.ce.CeQueueTesting;
  45. import org.sonar.db.ce.CeTaskTypes;
  46. import org.sonar.db.component.ComponentDto;
  47. import org.sonar.db.component.ComponentTesting;
  48. import org.sonar.db.user.UserDto;
  49. import static java.util.Arrays.asList;
  50. import static java.util.Collections.emptyMap;
  51. import static org.assertj.core.api.Assertions.assertThat;
  52. import static org.assertj.core.api.Assertions.assertThatThrownBy;
  53. import static org.assertj.core.api.Assertions.fail;
  54. import static org.mockito.ArgumentMatchers.anyLong;
  55. import static org.mockito.Mockito.mock;
  56. import static org.mockito.Mockito.verify;
  57. import static org.mockito.Mockito.verifyZeroInteractions;
  58. import static org.mockito.Mockito.when;
  59. import static org.sonar.ce.container.ComputeEngineStatus.Status.STARTED;
  60. import static org.sonar.ce.container.ComputeEngineStatus.Status.STOPPING;
  61. public class InternalCeQueueImplTest {
  62. private static final String AN_ANALYSIS_UUID = "U1";
  63. private static final String WORKER_UUID_1 = "worker uuid 1";
  64. private static final String WORKER_UUID_2 = "worker uuid 2";
  65. private System2 system2 = new AlwaysIncreasingSystem2();
  66. @Rule
  67. public DbTester db = DbTester.create(system2);
  68. private DbSession session = db.getSession();
  69. private UuidFactory uuidFactory = UuidFactoryImpl.INSTANCE;
  70. private CEQueueStatus queueStatus = new CEQueueStatusImpl(db.getDbClient(), mock(System2.class));
  71. private ComputeEngineStatus computeEngineStatus = mock(ComputeEngineStatus.class);
  72. private InternalCeQueue underTest = new InternalCeQueueImpl(system2, db.getDbClient(), uuidFactory, queueStatus, computeEngineStatus);
  73. @Before
  74. public void setUp() {
  75. when(computeEngineStatus.getStatus()).thenReturn(STARTED);
  76. }
  77. @Test
  78. public void submit_returns_task_populated_from_CeTaskSubmit_and_creates_CeQueue_row() {
  79. CeTaskSubmit taskSubmit = createTaskSubmit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1"), "rob");
  80. CeTask task = underTest.submit(taskSubmit);
  81. UserDto userDto = db.getDbClient().userDao().selectByUuid(db.getSession(), taskSubmit.getSubmitterUuid());
  82. verifyCeTask(taskSubmit, task, null, userDto);
  83. verifyCeQueueDtoForTaskSubmit(taskSubmit);
  84. }
  85. @Test
  86. public void submit_populates_component_name_and_key_of_CeTask_if_component_exists() {
  87. ComponentDto componentDto = insertComponent(newProjectDto("PROJECT_1"));
  88. CeTaskSubmit taskSubmit = createTaskSubmit(CeTaskTypes.REPORT, componentDto, null);
  89. CeTask task = underTest.submit(taskSubmit);
  90. verifyCeTask(taskSubmit, task, componentDto, null);
  91. }
  92. @Test
  93. public void submit_returns_task_without_component_info_when_submit_has_none() {
  94. CeTaskSubmit taskSubmit = createTaskSubmit("not cpt related");
  95. CeTask task = underTest.submit(taskSubmit);
  96. verifyCeTask(taskSubmit, task, null, null);
  97. }
  98. @Test
  99. public void massSubmit_returns_tasks_for_each_CeTaskSubmit_populated_from_CeTaskSubmit_and_creates_CeQueue_row_for_each() {
  100. CeTaskSubmit taskSubmit1 = createTaskSubmit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1"), "rob");
  101. CeTaskSubmit taskSubmit2 = createTaskSubmit("some type");
  102. List<CeTask> tasks = underTest.massSubmit(asList(taskSubmit1, taskSubmit2));
  103. UserDto userDto1 = db.getDbClient().userDao().selectByUuid(db.getSession(), taskSubmit1.getSubmitterUuid());
  104. assertThat(tasks).hasSize(2);
  105. verifyCeTask(taskSubmit1, tasks.get(0), null, userDto1);
  106. verifyCeTask(taskSubmit2, tasks.get(1), null, null);
  107. verifyCeQueueDtoForTaskSubmit(taskSubmit1);
  108. verifyCeQueueDtoForTaskSubmit(taskSubmit2);
  109. }
  110. @Test
  111. public void massSubmit_populates_component_name_and_key_of_CeTask_if_component_exists() {
  112. ComponentDto componentDto1 = insertComponent(newProjectDto("PROJECT_1"));
  113. CeTaskSubmit taskSubmit1 = createTaskSubmit(CeTaskTypes.REPORT, componentDto1, null);
  114. CeTaskSubmit taskSubmit2 = createTaskSubmit("something", newProjectDto("non existing component uuid"), null);
  115. List<CeTask> tasks = underTest.massSubmit(asList(taskSubmit1, taskSubmit2));
  116. assertThat(tasks).hasSize(2);
  117. verifyCeTask(taskSubmit1, tasks.get(0), componentDto1, null);
  118. verifyCeTask(taskSubmit2, tasks.get(1), null, null);
  119. }
  120. @Test
  121. public void peek_throws_NPE_if_workerUUid_is_null() {
  122. assertThatThrownBy(() -> underTest.peek(null, true))
  123. .isInstanceOf(NullPointerException.class)
  124. .hasMessage("workerUuid can't be null");
  125. }
  126. @Test
  127. public void test_remove() {
  128. CeTask task = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1"));
  129. Optional<CeTask> peek = underTest.peek(WORKER_UUID_1, true);
  130. underTest.remove(peek.get(), CeActivityDto.Status.SUCCESS, null, null);
  131. // queue is empty
  132. assertThat(db.getDbClient().ceQueueDao().selectByUuid(db.getSession(), task.getUuid())).isNotPresent();
  133. assertThat(underTest.peek(WORKER_UUID_2, true)).isNotPresent();
  134. // available in history
  135. Optional<CeActivityDto> history = db.getDbClient().ceActivityDao().selectByUuid(db.getSession(), task.getUuid());
  136. assertThat(history).isPresent();
  137. assertThat(history.get().getStatus()).isEqualTo(CeActivityDto.Status.SUCCESS);
  138. assertThat(history.get().getIsLast()).isTrue();
  139. assertThat(history.get().getAnalysisUuid()).isNull();
  140. }
  141. @Test
  142. public void remove_throws_IAE_if_exception_is_provided_but_status_is_SUCCESS() {
  143. assertThatThrownBy(() -> underTest.remove(mock(CeTask.class), CeActivityDto.Status.SUCCESS, null, new RuntimeException("Some error")))
  144. .isInstanceOf(IllegalArgumentException.class)
  145. .hasMessage("Error can be provided only when status is FAILED");
  146. }
  147. @Test
  148. public void remove_throws_IAE_if_exception_is_provided_but_status_is_CANCELED() {
  149. assertThatThrownBy(() -> underTest.remove(mock(CeTask.class), CeActivityDto.Status.CANCELED, null, new RuntimeException("Some error")))
  150. .isInstanceOf(IllegalArgumentException.class)
  151. .hasMessage("Error can be provided only when status is FAILED");
  152. }
  153. @Test
  154. public void remove_does_not_set_analysisUuid_in_CeActivity_when_CeTaskResult_has_no_analysis_uuid() {
  155. CeTask task = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1"));
  156. Optional<CeTask> peek = underTest.peek(WORKER_UUID_1, true);
  157. underTest.remove(peek.get(), CeActivityDto.Status.SUCCESS, newTaskResult(null), null);
  158. // available in history
  159. Optional<CeActivityDto> history = db.getDbClient().ceActivityDao().selectByUuid(db.getSession(), task.getUuid());
  160. assertThat(history).isPresent();
  161. assertThat(history.get().getAnalysisUuid()).isNull();
  162. }
  163. @Test
  164. public void remove_sets_analysisUuid_in_CeActivity_when_CeTaskResult_has_analysis_uuid() {
  165. CeTask task = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1"));
  166. Optional<CeTask> peek = underTest.peek(WORKER_UUID_2, true);
  167. underTest.remove(peek.get(), CeActivityDto.Status.SUCCESS, newTaskResult(AN_ANALYSIS_UUID), null);
  168. // available in history
  169. Optional<CeActivityDto> history = db.getDbClient().ceActivityDao().selectByUuid(db.getSession(), task.getUuid());
  170. assertThat(history).isPresent();
  171. assertThat(history.get().getAnalysisUuid()).isEqualTo("U1");
  172. }
  173. @Test
  174. public void remove_saves_error_message_and_stacktrace_when_exception_is_provided() {
  175. Throwable error = new NullPointerException("Fake NPE to test persistence to DB");
  176. CeTask task = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1"));
  177. Optional<CeTask> peek = underTest.peek(WORKER_UUID_1, true);
  178. underTest.remove(peek.get(), CeActivityDto.Status.FAILED, null, error);
  179. Optional<CeActivityDto> activityDto = db.getDbClient().ceActivityDao().selectByUuid(session, task.getUuid());
  180. assertThat(activityDto).isPresent();
  181. assertThat(activityDto.get().getErrorMessage()).isEqualTo(error.getMessage());
  182. assertThat(activityDto.get().getErrorStacktrace()).isEqualToIgnoringWhitespace(stacktraceToString(error));
  183. assertThat(activityDto.get().getErrorType()).isNull();
  184. }
  185. @Test
  186. public void remove_saves_error_when_TypedMessageException_is_provided() {
  187. Throwable error = new TypedExceptionImpl("aType", "aMessage");
  188. CeTask task = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1"));
  189. Optional<CeTask> peek = underTest.peek(WORKER_UUID_1, true);
  190. underTest.remove(peek.get(), CeActivityDto.Status.FAILED, null, error);
  191. CeActivityDto activityDto = db.getDbClient().ceActivityDao().selectByUuid(session, task.getUuid()).get();
  192. assertThat(activityDto.getErrorType()).isEqualTo("aType");
  193. assertThat(activityDto.getErrorMessage()).isEqualTo("aMessage");
  194. assertThat(activityDto.getErrorStacktrace()).isEqualToIgnoringWhitespace(stacktraceToString(error));
  195. }
  196. @Test
  197. public void remove_updates_queueStatus_success_even_if_task_does_not_exist_in_DB() {
  198. CEQueueStatus queueStatus = mock(CEQueueStatus.class);
  199. CeTask task = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1"));
  200. db.getDbClient().ceQueueDao().deleteByUuid(db.getSession(), task.getUuid());
  201. db.commit();
  202. InternalCeQueueImpl underTest = new InternalCeQueueImpl(system2, db.getDbClient(), null, queueStatus, null);
  203. try {
  204. underTest.remove(task, CeActivityDto.Status.SUCCESS, null, null);
  205. fail("remove should have thrown a IllegalStateException");
  206. } catch (IllegalStateException e) {
  207. verify(queueStatus).addSuccess(anyLong());
  208. }
  209. }
  210. @Test
  211. public void remove_updates_queueStatus_failure_even_if_task_does_not_exist_in_DB() {
  212. CEQueueStatus queueStatusMock = mock(CEQueueStatus.class);
  213. CeTask task = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1"));
  214. db.getDbClient().ceQueueDao().deleteByUuid(db.getSession(), task.getUuid());
  215. db.commit();
  216. InternalCeQueueImpl underTest = new InternalCeQueueImpl(system2, db.getDbClient(), null, queueStatusMock, null);
  217. try {
  218. underTest.remove(task, CeActivityDto.Status.FAILED, null, null);
  219. fail("remove should have thrown a IllegalStateException");
  220. } catch (IllegalStateException e) {
  221. verify(queueStatusMock).addError(anyLong());
  222. }
  223. }
  224. @Test
  225. public void cancelWornOuts_does_not_update_queueStatus() {
  226. CEQueueStatus queueStatusMock = mock(CEQueueStatus.class);
  227. CeTask task = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1"));
  228. db.executeUpdateSql("update ce_queue set status = 'PENDING', started_at = 123 where uuid = '" + task.getUuid() + "'");
  229. db.commit();
  230. InternalCeQueueImpl underTest = new InternalCeQueueImpl(system2, db.getDbClient(), null, queueStatusMock, null);
  231. underTest.cancelWornOuts();
  232. assertThat(db.getDbClient().ceActivityDao().selectByUuid(db.getSession(), task.getUuid())).isPresent();
  233. verifyZeroInteractions(queueStatusMock);
  234. }
  235. private static class TypedExceptionImpl extends RuntimeException implements TypedException {
  236. private final String type;
  237. private TypedExceptionImpl(String type, String message) {
  238. super(message);
  239. this.type = type;
  240. }
  241. @Override
  242. public String getType() {
  243. return type;
  244. }
  245. }
  246. @Test
  247. public void remove_copies_workerUuid() {
  248. CeQueueDto ceQueueDto = db.getDbClient().ceQueueDao().insert(session, new CeQueueDto()
  249. .setUuid("uuid")
  250. .setTaskType("foo")
  251. .setStatus(CeQueueDto.Status.PENDING));
  252. makeInProgress(ceQueueDto, "Dustin");
  253. db.commit();
  254. underTest.remove(new CeTask.Builder()
  255. .setUuid("uuid")
  256. .setType("bar")
  257. .build(), CeActivityDto.Status.SUCCESS, null, null);
  258. CeActivityDto dto = db.getDbClient().ceActivityDao().selectByUuid(db.getSession(), "uuid").get();
  259. assertThat(dto.getWorkerUuid()).isEqualTo("Dustin");
  260. }
  261. @Test
  262. public void fail_to_remove_if_not_in_queue() {
  263. CeTask task = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1"));
  264. underTest.remove(task, CeActivityDto.Status.SUCCESS, null, null);
  265. assertThatThrownBy(() -> underTest.remove(task, CeActivityDto.Status.SUCCESS, null, null))
  266. .isInstanceOf(IllegalStateException.class);
  267. }
  268. @Test
  269. public void test_peek() {
  270. CeTask task = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1"));
  271. Optional<CeTask> peek = underTest.peek(WORKER_UUID_1, true);
  272. assertThat(peek).isPresent();
  273. assertThat(peek.get().getUuid()).isEqualTo(task.getUuid());
  274. assertThat(peek.get().getType()).isEqualTo(CeTaskTypes.REPORT);
  275. assertThat(peek.get().getComponent()).contains(new CeTask.Component("PROJECT_1", null, null));
  276. assertThat(peek.get().getMainComponent()).contains(peek.get().getComponent().get());
  277. // no more pending tasks
  278. peek = underTest.peek(WORKER_UUID_2, true);
  279. assertThat(peek).isEmpty();
  280. }
  281. @Test
  282. public void peek_populates_name_and_key_for_existing_component_and_main_component() {
  283. ComponentDto project = db.components().insertPrivateProject();
  284. ComponentDto branch = db.components().insertProjectBranch(project);
  285. CeTask task = submit(CeTaskTypes.REPORT, branch);
  286. Optional<CeTask> peek = underTest.peek(WORKER_UUID_1, true);
  287. assertThat(peek).isPresent();
  288. assertThat(peek.get().getUuid()).isEqualTo(task.getUuid());
  289. assertThat(peek.get().getType()).isEqualTo(CeTaskTypes.REPORT);
  290. assertThat(peek.get().getComponent()).contains(new CeTask.Component(branch.uuid(), branch.getDbKey(), branch.name()));
  291. assertThat(peek.get().getMainComponent()).contains(new CeTask.Component(project.uuid(), project.getDbKey(), project.name()));
  292. // no more pending tasks
  293. peek = underTest.peek(WORKER_UUID_2, true);
  294. assertThat(peek).isEmpty();
  295. }
  296. @Test
  297. public void peek_is_paused_then_resumed() {
  298. CeTask task = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1"));
  299. underTest.pauseWorkers();
  300. Optional<CeTask> peek = underTest.peek(WORKER_UUID_1, true);
  301. assertThat(peek).isEmpty();
  302. underTest.resumeWorkers();
  303. peek = underTest.peek(WORKER_UUID_1, true);
  304. assertThat(peek).isPresent();
  305. assertThat(peek.get().getUuid()).isEqualTo(task.getUuid());
  306. }
  307. @Test
  308. public void peek_ignores_in_progress_tasks() {
  309. CeQueueDto dto = db.getDbClient().ceQueueDao().insert(session, new CeQueueDto()
  310. .setUuid("uuid")
  311. .setTaskType("foo")
  312. .setStatus(CeQueueDto.Status.PENDING));
  313. makeInProgress(dto, "foo");
  314. db.commit();
  315. assertThat(underTest.peek(WORKER_UUID_1, true)).isEmpty();
  316. }
  317. @Test
  318. public void peek_nothing_if_application_status_stopping() {
  319. submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1"));
  320. when(computeEngineStatus.getStatus()).thenReturn(STOPPING);
  321. Optional<CeTask> peek = underTest.peek(WORKER_UUID_1, true);
  322. assertThat(peek).isEmpty();
  323. }
  324. @Test
  325. public void peek_peeks_pending_task() {
  326. db.getDbClient().ceQueueDao().insert(session, new CeQueueDto()
  327. .setUuid("uuid")
  328. .setTaskType("foo")
  329. .setStatus(CeQueueDto.Status.PENDING));
  330. db.commit();
  331. assertThat(underTest.peek(WORKER_UUID_1, true).get().getUuid()).isEqualTo("uuid");
  332. }
  333. @Test
  334. public void peek_resets_to_pending_any_task_in_progress_for_specified_worker_uuid_and_updates_updatedAt() {
  335. insertPending("u0"); // add a pending one that will be picked so that u1 isn't peek and status reset is visible in DB
  336. CeQueueDto u1 = insertPending("u1");// will be picked-because older than any of the reset ones
  337. CeQueueDto u2 = insertInProgress("u2", WORKER_UUID_1);// will be reset
  338. assertThat(underTest.peek(WORKER_UUID_1, true).get().getUuid()).isEqualTo("u0");
  339. verifyUnmodifiedTask(u1);
  340. verifyResetTask(u2);
  341. }
  342. @Test
  343. public void peek_resets_to_pending_any_task_in_progress_for_specified_worker_uuid_and_only_this_uuid() {
  344. insertPending("u0"); // add a pending one that will be picked so that u1 isn't peek and status reset is visible in DB
  345. CeQueueDto u1 = insertInProgress("u1", WORKER_UUID_1);
  346. CeQueueDto u2 = insertInProgress("u2", WORKER_UUID_2);
  347. CeQueueDto u3 = insertInProgress("u3", WORKER_UUID_1);
  348. CeQueueDto u4 = insertInProgress("u4", WORKER_UUID_2);
  349. assertThat(underTest.peek(WORKER_UUID_1, true).get().getUuid()).isEqualTo("u0");
  350. verifyResetTask(u1);
  351. verifyUnmodifiedTask(u2);
  352. verifyResetTask(u3);
  353. verifyUnmodifiedTask(u4);
  354. }
  355. private void verifyResetTask(CeQueueDto originalDto) {
  356. CeQueueDto dto = db.getDbClient().ceQueueDao().selectByUuid(session, originalDto.getUuid()).get();
  357. assertThat(dto.getStatus()).isEqualTo(CeQueueDto.Status.PENDING);
  358. assertThat(dto.getCreatedAt()).isEqualTo(originalDto.getCreatedAt());
  359. assertThat(dto.getUpdatedAt()).isGreaterThan(originalDto.getUpdatedAt());
  360. }
  361. private void verifyUnmodifiedTask(CeQueueDto originalDto) {
  362. CeQueueDto dto = db.getDbClient().ceQueueDao().selectByUuid(session, originalDto.getUuid()).get();
  363. assertThat(dto.getStatus()).isEqualTo(originalDto.getStatus());
  364. assertThat(dto.getCreatedAt()).isEqualTo(originalDto.getCreatedAt());
  365. assertThat(dto.getUpdatedAt()).isEqualTo(originalDto.getUpdatedAt());
  366. }
  367. private CeQueueDto insertInProgress(String uuid, String workerUuid) {
  368. CeQueueDto dto = new CeQueueDto()
  369. .setUuid(uuid)
  370. .setTaskType("foo")
  371. .setStatus(CeQueueDto.Status.PENDING);
  372. db.getDbClient().ceQueueDao().insert(session, dto);
  373. makeInProgress(dto, workerUuid);
  374. db.commit();
  375. return db.getDbClient().ceQueueDao().selectByUuid(session, uuid).get();
  376. }
  377. private CeQueueDto insertPending(String uuid) {
  378. CeQueueDto dto = new CeQueueDto()
  379. .setUuid(uuid)
  380. .setTaskType("foo")
  381. .setStatus(CeQueueDto.Status.PENDING);
  382. db.getDbClient().ceQueueDao().insert(session, dto);
  383. db.commit();
  384. return dto;
  385. }
  386. @Test
  387. public void cancel_pending() {
  388. CeTask task = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1"));
  389. CeQueueDto queueDto = db.getDbClient().ceQueueDao().selectByUuid(db.getSession(), task.getUuid()).get();
  390. underTest.cancel(db.getSession(), queueDto);
  391. Optional<CeActivityDto> activity = db.getDbClient().ceActivityDao().selectByUuid(db.getSession(), task.getUuid());
  392. assertThat(activity).isPresent();
  393. assertThat(activity.get().getStatus()).isEqualTo(CeActivityDto.Status.CANCELED);
  394. }
  395. @Test
  396. public void fail_to_cancel_if_in_progress() {
  397. CeTask task = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1"));
  398. underTest.peek(WORKER_UUID_2, true);
  399. CeQueueDto queueDto = db.getDbClient().ceQueueDao().selectByUuid(db.getSession(), task.getUuid()).get();
  400. assertThatThrownBy(() -> underTest.cancel(db.getSession(), queueDto))
  401. .isInstanceOf(IllegalStateException.class)
  402. .hasMessageContaining("Task is in progress and can't be canceled");
  403. }
  404. @Test
  405. public void cancelAll_pendings_but_not_in_progress() {
  406. CeTask inProgressTask = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1"));
  407. CeTask pendingTask1 = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_2"));
  408. CeTask pendingTask2 = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_3"));
  409. underTest.peek(WORKER_UUID_2, true);
  410. int canceledCount = underTest.cancelAll();
  411. assertThat(canceledCount).isEqualTo(2);
  412. Optional<CeActivityDto> history = db.getDbClient().ceActivityDao().selectByUuid(db.getSession(), pendingTask1.getUuid());
  413. assertThat(history.get().getStatus()).isEqualTo(CeActivityDto.Status.CANCELED);
  414. history = db.getDbClient().ceActivityDao().selectByUuid(db.getSession(), pendingTask2.getUuid());
  415. assertThat(history.get().getStatus()).isEqualTo(CeActivityDto.Status.CANCELED);
  416. history = db.getDbClient().ceActivityDao().selectByUuid(db.getSession(), inProgressTask.getUuid());
  417. assertThat(history).isEmpty();
  418. }
  419. @Test
  420. public void resetTasksWithUnknownWorkerUUIDs_reset_only_in_progress_tasks() {
  421. CeQueueDto u1 = insertCeQueueDto("u1");
  422. CeQueueDto u2 = insertCeQueueDto("u2");
  423. CeQueueDto u6 = insertInProgress("u6", "worker1");
  424. CeQueueDto u7 = insertInProgress("u7", "worker2");
  425. CeQueueDto u8 = insertInProgress("u8", "worker3");
  426. underTest.resetTasksWithUnknownWorkerUUIDs(ImmutableSet.of("worker2", "worker3"));
  427. // Pending tasks must not be modified even if a workerUUID is not present
  428. verifyUnmodified(u1);
  429. verifyUnmodified(u2);
  430. // Unknown worker : null, "worker1"
  431. verifyReset(u6);
  432. // Known workers : "worker2", "worker3"
  433. verifyUnmodified(u7);
  434. verifyUnmodified(u8);
  435. }
  436. @Test
  437. public void resetTasksWithUnknownWorkerUUIDs_with_empty_set_will_reset_all_in_progress_tasks() {
  438. CeQueueDto u1 = insertCeQueueDto("u1");
  439. CeQueueDto u2 = insertCeQueueDto("u2");
  440. CeQueueDto u6 = insertInProgress("u6", "worker1");
  441. CeQueueDto u7 = insertInProgress("u7", "worker2");
  442. CeQueueDto u8 = insertInProgress("u8", "worker3");
  443. underTest.resetTasksWithUnknownWorkerUUIDs(ImmutableSet.of());
  444. // Pending tasks must not be modified even if a workerUUID is not present
  445. verifyUnmodified(u1);
  446. verifyUnmodified(u2);
  447. // Unknown worker : null, "worker1"
  448. verifyReset(u6);
  449. verifyReset(u7);
  450. verifyReset(u8);
  451. }
  452. @Test
  453. public void resetTasksWithUnknownWorkerUUIDs_with_worker_without_tasks_will_reset_all_in_progress_tasks() {
  454. CeQueueDto u1 = insertCeQueueDto("u1");
  455. CeQueueDto u2 = insertCeQueueDto("u2");
  456. CeQueueDto u6 = insertInProgress("u6", "worker1");
  457. CeQueueDto u7 = insertInProgress("u7", "worker2");
  458. CeQueueDto u8 = insertInProgress("u8", "worker3");
  459. underTest.resetTasksWithUnknownWorkerUUIDs(ImmutableSet.of("worker1000", "worker1001"));
  460. // Pending tasks must not be modified even if a workerUUID is not present
  461. verifyUnmodified(u1);
  462. verifyUnmodified(u2);
  463. // Unknown worker : null, "worker1"
  464. verifyReset(u6);
  465. verifyReset(u7);
  466. verifyReset(u8);
  467. }
  468. private void verifyReset(CeQueueDto original) {
  469. CeQueueDto dto = db.getDbClient().ceQueueDao().selectByUuid(db.getSession(), original.getUuid()).get();
  470. // We do not touch CreatedAt
  471. assertThat(dto.getCreatedAt()).isEqualTo(original.getCreatedAt());
  472. // Status must have changed to PENDING and must not be equal to previous status
  473. assertThat(dto.getStatus()).isEqualTo(CeQueueDto.Status.PENDING).isNotEqualTo(original.getStatus());
  474. // UpdatedAt must have been updated
  475. assertThat(dto.getUpdatedAt()).isNotEqualTo(original.getUpdatedAt());
  476. assertThat(dto.getStartedAt()).isEqualTo(original.getStartedAt());
  477. // WorkerUuid must be null
  478. assertThat(dto.getWorkerUuid()).isNull();
  479. }
  480. private void verifyUnmodified(CeQueueDto original) {
  481. CeQueueDto dto = db.getDbClient().ceQueueDao().selectByUuid(db.getSession(), original.getUuid()).get();
  482. assertThat(dto.getStatus()).isEqualTo(original.getStatus());
  483. assertThat(dto.getCreatedAt()).isEqualTo(original.getCreatedAt());
  484. assertThat(dto.getUpdatedAt()).isEqualTo(original.getUpdatedAt());
  485. }
  486. private CeQueueDto insertCeQueueDto(String uuid) {
  487. CeQueueDto dto = new CeQueueDto()
  488. .setUuid(uuid)
  489. .setTaskType("foo")
  490. .setStatus(CeQueueDto.Status.PENDING);
  491. db.getDbClient().ceQueueDao().insert(db.getSession(), dto);
  492. db.commit();
  493. return dto;
  494. }
  495. private void verifyCeTask(CeTaskSubmit taskSubmit, CeTask task, @Nullable ComponentDto componentDto, @Nullable UserDto userDto) {
  496. assertThat(task.getUuid()).isEqualTo(taskSubmit.getUuid());
  497. assertThat(task.getType()).isEqualTo(taskSubmit.getType());
  498. if (componentDto != null) {
  499. CeTask.Component component = task.getComponent().get();
  500. assertThat(component.getUuid()).isEqualTo(componentDto.uuid());
  501. assertThat(component.getKey()).contains(componentDto.getDbKey());
  502. assertThat(component.getName()).contains(componentDto.name());
  503. } else if (taskSubmit.getComponent().isPresent()) {
  504. assertThat(task.getComponent()).contains(new CeTask.Component(taskSubmit.getComponent().get().getUuid(), null, null));
  505. } else {
  506. assertThat(task.getComponent()).isEmpty();
  507. }
  508. if (taskSubmit.getSubmitterUuid() != null) {
  509. if (userDto == null) {
  510. assertThat(task.getSubmitter().getUuid()).isEqualTo(taskSubmit.getSubmitterUuid());
  511. assertThat(task.getSubmitter().getLogin()).isNull();
  512. } else {
  513. assertThat(task.getSubmitter().getUuid()).isEqualTo(userDto.getUuid()).isEqualTo(taskSubmit.getSubmitterUuid());
  514. assertThat(task.getSubmitter().getUuid()).isEqualTo(userDto.getLogin());
  515. }
  516. }
  517. }
  518. private void verifyCeQueueDtoForTaskSubmit(CeTaskSubmit taskSubmit) {
  519. Optional<CeQueueDto> queueDto = db.getDbClient().ceQueueDao().selectByUuid(db.getSession(), taskSubmit.getUuid());
  520. assertThat(queueDto).isPresent();
  521. CeQueueDto dto = queueDto.get();
  522. assertThat(dto.getTaskType()).isEqualTo(taskSubmit.getType());
  523. Optional<CeTaskSubmit.Component> component = taskSubmit.getComponent();
  524. if (component.isPresent()) {
  525. assertThat(dto.getMainComponentUuid()).isEqualTo(component.get().getMainComponentUuid());
  526. assertThat(dto.getComponentUuid()).isEqualTo(component.get().getUuid());
  527. } else {
  528. assertThat(dto.getMainComponentUuid()).isNull();
  529. assertThat(dto.getComponentUuid()).isNull();
  530. }
  531. assertThat(dto.getSubmitterUuid()).isEqualTo(taskSubmit.getSubmitterUuid());
  532. assertThat(dto.getCreatedAt()).isEqualTo(dto.getUpdatedAt());
  533. }
  534. private ComponentDto newProjectDto(String uuid) {
  535. return ComponentTesting.newPublicProjectDto(uuid).setName("name_" + uuid).setDbKey("key_" + uuid);
  536. }
  537. private CeTask submit(String reportType, ComponentDto componentDto) {
  538. return underTest.submit(createTaskSubmit(reportType, componentDto, null));
  539. }
  540. private CeTaskSubmit createTaskSubmit(String type) {
  541. return createTaskSubmit(type, null, null);
  542. }
  543. private CeTaskSubmit createTaskSubmit(String type, @Nullable ComponentDto componentDto, @Nullable String submitterUuid) {
  544. CeTaskSubmit.Builder builder = underTest.prepareSubmit()
  545. .setType(type)
  546. .setSubmitterUuid(submitterUuid)
  547. .setCharacteristics(emptyMap());
  548. if (componentDto != null) {
  549. builder.setComponent(CeTaskSubmit.Component.fromDto(componentDto));
  550. }
  551. return builder.build();
  552. }
  553. private CeTaskResult newTaskResult(@Nullable String analysisUuid) {
  554. CeTaskResult taskResult = mock(CeTaskResult.class);
  555. when(taskResult.getAnalysisUuid()).thenReturn(java.util.Optional.ofNullable(analysisUuid));
  556. return taskResult;
  557. }
  558. private ComponentDto insertComponent(ComponentDto componentDto) {
  559. return db.components().insertComponent(componentDto);
  560. }
  561. private CeQueueDto makeInProgress(CeQueueDto ceQueueDto, String workerUuid) {
  562. CeQueueTesting.makeInProgress(session, workerUuid, system2.now(), ceQueueDto);
  563. return db.getDbClient().ceQueueDao().selectByUuid(session, ceQueueDto.getUuid()).get();
  564. }
  565. private static String stacktraceToString(Throwable error) {
  566. ByteArrayOutputStream out = new ByteArrayOutputStream();
  567. error.printStackTrace(new PrintStream(out));
  568. return out.toString();
  569. }
  570. }