aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJulien Lancelot <julien.lancelot@sonarsource.com>2018-11-19 10:46:54 +0100
committerJulien Lancelot <julien.lancelot@sonarsource.com>2018-11-19 11:08:46 +0100
commit5e45d1322ed79da2ec30934276e326e3ae9273b3 (patch)
tree7be34eb4d1dfe79ce729ae0d8709295a25346113
parentc7d7b674db196194deadf63c5cd4911f4df42b32 (diff)
downloadsonarqube-5e45d1322ed79da2ec30934276e326e3ae9273b3.tar.gz
sonarqube-5e45d1322ed79da2ec30934276e326e3ae9273b3.zip
Extract concurrent ce worker test to its own class test
When tests are in the same class, a Deadlock is generated on MySQL : ### Error updating database. Cause: com.mysql.jdbc.exceptions.jdbc4.MySQLTransactionRollbackException: Deadlock found when trying to get lock; try restarting transaction ### The error may involve org.sonar.db.ce.CeActivityMapper.insert-Inline ### The error occurred while setting parameters ### SQL: insert into ce_activity ( uuid, component_uuid, analysis_uuid, status, task_type, is_last, is_last_key, submitter_login, submitted_at, worker_uuid, execution_count, started_at, executed_at, created_at, updated_at, execution_time_ms, error_message, error_stacktrace, error_type ) values ( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? ) ### Cause: com.mysql.jdbc.exceptions.jdbc4.MySQLTransactionRollbackException: Deadlock found when trying to get lock; try restarting transaction at org.apache.ibatis.exceptions.ExceptionFactory.wrapException(ExceptionFactory.java:30) at org.apache.ibatis.session.defaults.DefaultSqlSession.update(DefaultSqlSession.java:200) at org.apache.ibatis.session.defaults.DefaultSqlSession.insert(DefaultSqlSession.java:185) at org.apache.ibatis.binding.MapperMethod.execute(MapperMethod.java:57) at org.apache.ibatis.binding.MapperProxy.invoke(MapperProxy.java:59) at com.sun.proxy.$Proxy22.insert(Unknown Source) at org.sonar.db.ce.CeActivityDao.insert(CeActivityDao.java:55) at org.sonar.ce.queue.CeQueueImpl.remove(CeQueueImpl.java:156) at org.sonar.ce.queue.InternalCeQueueImpl.remove(InternalCeQueueImpl.java:110) at org.sonar.ce.taskprocessor.CeWorkerImpl.finalizeTask(CeWorkerImpl.java:165) at org.sonar.ce.taskprocessor.CeWorkerImpl.executeTask(CeWorkerImpl.java:148) at org.sonar.ce.taskprocessor.CeWorkerImpl.findAndProcessTask(CeWorkerImpl.java:97) at org.sonar.ce.taskprocessor.CeWorkerImpl.withCustomizedThreadName(CeWorkerImpl.java:81) at org.sonar.ce.taskprocessor.CeWorkerImpl.call(CeWorkerImpl.java:73) at org.sonar.ce.taskprocessor.CeWorkerImpl.call(CeWorkerImpl.java:43) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
-rw-r--r--tests/src/test/java/org/sonarqube/tests/Category5Suite.java2
-rw-r--r--tests/src/test/java/org/sonarqube/tests/ce/CeWorkersTest.java194
-rw-r--r--tests/src/test/java/org/sonarqube/tests/ce/ConcurrentCeWorkersTest.java262
3 files changed, 264 insertions, 194 deletions
diff --git a/tests/src/test/java/org/sonarqube/tests/Category5Suite.java b/tests/src/test/java/org/sonarqube/tests/Category5Suite.java
index 18209ae65d0..369ba0ff4a4 100644
--- a/tests/src/test/java/org/sonarqube/tests/Category5Suite.java
+++ b/tests/src/test/java/org/sonarqube/tests/Category5Suite.java
@@ -25,6 +25,7 @@ import org.sonarqube.tests.analysis.AnalysisEsResilienceTest;
import org.sonarqube.tests.authorisation.SystemPasscodeTest;
import org.sonarqube.tests.ce.CeShutdownTest;
import org.sonarqube.tests.ce.CeWorkersTest;
+import org.sonarqube.tests.ce.ConcurrentCeWorkersTest;
import org.sonarqube.tests.issue.IssueCreationDatePluginChangedTest;
import org.sonarqube.tests.marketplace.UpdateCenterTest;
import org.sonarqube.tests.qualityProfile.ActiveRuleEsResilienceTest;
@@ -70,6 +71,7 @@ import org.sonarqube.tests.user.UserEsResilienceTest;
// ce
CeShutdownTest.class,
CeWorkersTest.class,
+ ConcurrentCeWorkersTest.class,
// issues
IssueCreationDatePluginChangedTest.class,
diff --git a/tests/src/test/java/org/sonarqube/tests/ce/CeWorkersTest.java b/tests/src/test/java/org/sonarqube/tests/ce/CeWorkersTest.java
index 64d0625e192..ffbd8a408ca 100644
--- a/tests/src/test/java/org/sonarqube/tests/ce/CeWorkersTest.java
+++ b/tests/src/test/java/org/sonarqube/tests/ce/CeWorkersTest.java
@@ -23,67 +23,34 @@ import com.google.common.collect.ImmutableList;
import com.sonar.orchestrator.Orchestrator;
import com.sonar.orchestrator.OrchestratorBuilder;
import com.sonar.orchestrator.build.SonarScanner;
-import com.sonar.orchestrator.http.HttpMethod;
-import java.io.File;
import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.MappedByteBuffer;
-import java.nio.channels.FileChannel;
-import java.nio.file.Files;
-import java.util.List;
-import java.util.Random;
-import java.util.Set;
-import java.util.function.Function;
-import java.util.function.Predicate;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-import javax.annotation.Nullable;
-import org.junit.After;
import org.junit.AfterClass;
-import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
-import org.sonarqube.ws.WsCe;
import org.sonarqube.ws.client.PostRequest;
import org.sonarqube.ws.client.WsClient;
import org.sonarqube.ws.client.ce.ActivityWsRequest;
import util.ItUtils;
-import static com.google.common.collect.ImmutableSet.copyOf;
-import static java.lang.String.valueOf;
-import static java.util.stream.Collectors.toList;
-import static java.util.stream.Collectors.toSet;
import static org.assertj.core.api.Assertions.assertThat;
import static util.ItUtils.newAdminWsClient;
import static util.ItUtils.pluginArtifact;
import static util.ItUtils.xooPlugin;
public class CeWorkersTest {
- private static final int WAIT = 200; // ms
- private static final int MAX_WAIT_LOOP = 5 * 60 * 5; // 5 minutes
-
- private static final byte BLOCKING = (byte) 0x00;
- private static final byte UNLATCHED = (byte) 0x01;
-
- private static final String STATUS_PENDING = "PENDING";
- private static final String STATUS_IN_PROGRESS = "IN_PROGRESS";
@ClassRule
public static TemporaryFolder temporaryFolder = new TemporaryFolder();
- private static File sharedMemory;
private static Orchestrator orchestrator;
private static WsClient adminWsClient;
@BeforeClass
public static void setUp() throws Exception {
- sharedMemory = temporaryFolder.newFile();
-
OrchestratorBuilder builder = Orchestrator.builderEnv()
.addPlugin(pluginArtifact("fake-governance-plugin"))
- .setServerProperty("fakeGoverance.workerLatch.sharedMemoryFile", sharedMemory.getAbsolutePath())
// overwrite default value to display heap dump on OOM and reduce max heap
.setServerProperty("sonar.ce.javaOpts", "-Xmx256m -Xms128m")
.addPlugin(xooPlugin());
@@ -101,28 +68,6 @@ public class CeWorkersTest {
}
}
- @Before
- public void setup() throws Exception {
- unlockWorkersAndResetWorkerCount();
- }
-
- @After
- public void tearDown() throws Exception {
- unlockWorkersAndResetWorkerCount();
- }
-
- private void unlockWorkersAndResetWorkerCount() throws IOException {
- RandomAccessFile randomAccessFile = null;
- try {
- randomAccessFile = new RandomAccessFile(sharedMemory, "rw");
- MappedByteBuffer mappedByteBuffer = initMappedByteBuffer(randomAccessFile);
- releaseAnyAnalysisWithFakeGovernancePlugin(mappedByteBuffer);
- updateWorkerCount(1);
- } finally {
- close(randomAccessFile);
- }
- }
-
@Test
public void ce_worker_is_resilient_to_OOM_and_ISE_during_processing_of_a_task() throws InterruptedException {
submitFakeTask("OOM");
@@ -240,145 +185,6 @@ public class CeWorkersTest {
.failIfNotSuccessful();
}
- @Test
- public void enabled_worker_count_is_initially_1_and_can_be_changed_dynamically_by_plugin() throws IOException {
- assertThat(Files.lines(orchestrator.getServer().getCeLogs().toPath())
- .filter(s -> s.contains("Compute Engine will use ")))
- .isEmpty();
-
- RandomAccessFile randomAccessFile = null;
- try {
- randomAccessFile = new RandomAccessFile(sharedMemory, "rw");
- MappedByteBuffer mappedByteBuffer = initMappedByteBuffer(randomAccessFile);
-
- verifyAnalysesRunInParallel(mappedByteBuffer, 1);
-
- /* 4 <= newWorkerCount <= 7 */
- int newWorkerCount = 4 + new Random().nextInt(4);
- updateWorkerCount(newWorkerCount);
-
- Set<String> line = Files.lines(orchestrator.getServer().getCeLogs().toPath())
- .filter(s -> s.contains("Compute Engine will use "))
- .collect(Collectors.toSet());
- assertThat(line).hasSize(1);
- assertThat(line.iterator().next()).contains(valueOf(newWorkerCount));
-
- verifyAnalysesRunInParallel(mappedByteBuffer, newWorkerCount);
-
- int lowerWorkerCount = 3;
- updateWorkerCount(lowerWorkerCount);
- verifyAnalysesRunInParallel(mappedByteBuffer, lowerWorkerCount);
- } finally {
- close(randomAccessFile);
- }
- }
-
- private void updateWorkerCount(int newWorkerCount) {
- orchestrator.getServer()
- .newHttpCall("api/ce/refreshWorkerCount")
- .setMethod(HttpMethod.POST)
- .setParam("count", valueOf(newWorkerCount))
- .execute();
- }
-
- private void verifyAnalysesRunInParallel(MappedByteBuffer mappedByteBuffer, int workerCount) {
- assertThat(adminWsClient.ce().workerCount())
- .extracting(WsCe.WorkerCountResponse::getValue, WsCe.WorkerCountResponse::getCanSetWorkerCount)
- .containsOnly(workerCount, true);
-
- blockAnyAnalysisWithFakeGovernancePlugin(mappedByteBuffer);
-
- // start analysis of workerCount + 2 projects
- List<String> projectKeys = IntStream.range(0, workerCount + 2).mapToObj(i -> "prj" + i).collect(toList());
- for (String projectKey : projectKeys) {
- SonarScanner sonarRunner = SonarScanner.create(ItUtils.projectDir("shared/xoo-sample"))
- .setProperties("sonar.projectKey", projectKey);
- orchestrator.executeBuild(sonarRunner, false);
- }
-
- List<WsCe.Task> tasksList = waitForWsCallStatus(
- this::getTasksAllTasks,
- (tasks) -> verifyInProgressTaskCount(tasks, workerCount));
-
- assertThat(tasksList.stream()
- .filter(CeWorkersTest::pending)
- .map(WsCe.Task::getComponentKey)
- .collect(toSet()))
- .isEqualTo(copyOf(projectKeys.subList(workerCount, projectKeys.size())));
- assertThat(tasksList.stream()
- .filter(CeWorkersTest::inProgress)
- .map(WsCe.Task::getComponentKey)
- .collect(toSet()))
- .isEqualTo(copyOf(projectKeys.subList(0, workerCount)));
-
- releaseAnyAnalysisWithFakeGovernancePlugin(mappedByteBuffer);
-
- waitForWsCallStatus(this::getTasksAllTasks, List::isEmpty);
- }
-
- private static MappedByteBuffer initMappedByteBuffer(RandomAccessFile randomAccessFile) throws IOException {
- return randomAccessFile.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, 1);
- }
-
- private void releaseAnyAnalysisWithFakeGovernancePlugin(MappedByteBuffer mappedByteBuffer) {
- // let the blocked analyses finish running
- mappedByteBuffer.put(0, UNLATCHED);
- }
-
- private static void blockAnyAnalysisWithFakeGovernancePlugin(MappedByteBuffer mappedByteBuffer) {
- // block any analysis which will run with the fake-governance-plugin
- mappedByteBuffer.put(0, BLOCKING);
- }
-
- private void close(@Nullable RandomAccessFile randomAccessFile) throws IOException {
- if (randomAccessFile != null) {
- randomAccessFile.close();
- }
- }
-
- private static boolean verifyInProgressTaskCount(List<WsCe.Task> tasksList, int workerCount) {
- return tasksList.stream().filter(CeWorkersTest::inProgress).count() >= workerCount;
- }
-
- private static boolean pending(WsCe.Task task) {
- return WsCe.TaskStatus.PENDING == task.getStatus();
- }
-
- private static boolean inProgress(WsCe.Task task) {
- return WsCe.TaskStatus.IN_PROGRESS == task.getStatus();
- }
-
- private List<WsCe.Task> getTasksAllTasks(WsClient wsClient) {
- return wsClient.ce().activity(new ActivityWsRequest()
- .setStatus(ImmutableList.of(STATUS_PENDING, STATUS_IN_PROGRESS)))
- .getTasksList();
- }
-
- private <T> T waitForWsCallStatus(Function<WsClient, T> call, Predicate<T> test) {
- WsClient wsClient = ItUtils.newAdminWsClient(orchestrator);
- int i = 0;
- T returnValue = call.apply(wsClient);
- boolean expectedState = test.test(returnValue);
- while (i < MAX_WAIT_LOOP && !expectedState) {
- waitInterruptedly();
- i++;
- returnValue = call.apply(wsClient);
- expectedState = test.test(returnValue);
- }
- assertThat(expectedState)
- .as("Failed to wait for expected queue status. Last call returned:\n%s", returnValue)
- .isTrue();
- return returnValue;
- }
-
- private static void waitInterruptedly() {
- try {
- Thread.sleep(WAIT);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
-
private void waitForEmptyQueue() throws InterruptedException {
int delay = 200;
int timeout = 5 * 10; // 10 seconds
diff --git a/tests/src/test/java/org/sonarqube/tests/ce/ConcurrentCeWorkersTest.java b/tests/src/test/java/org/sonarqube/tests/ce/ConcurrentCeWorkersTest.java
new file mode 100644
index 00000000000..1173fe05f80
--- /dev/null
+++ b/tests/src/test/java/org/sonarqube/tests/ce/ConcurrentCeWorkersTest.java
@@ -0,0 +1,262 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonarqube.tests.ce;
+
+import com.google.common.collect.ImmutableList;
+import com.sonar.orchestrator.Orchestrator;
+import com.sonar.orchestrator.OrchestratorBuilder;
+import com.sonar.orchestrator.build.SonarScanner;
+import com.sonar.orchestrator.http.HttpMethod;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import javax.annotation.Nullable;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.sonarqube.ws.WsCe;
+import org.sonarqube.ws.client.WsClient;
+import org.sonarqube.ws.client.ce.ActivityWsRequest;
+import util.ItUtils;
+
+import static com.google.common.collect.ImmutableSet.copyOf;
+import static java.lang.String.valueOf;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toSet;
+import static org.assertj.core.api.Assertions.assertThat;
+import static util.ItUtils.newAdminWsClient;
+import static util.ItUtils.pluginArtifact;
+import static util.ItUtils.xooPlugin;
+
+public class ConcurrentCeWorkersTest {
+ private static final int WAIT = 200; // ms
+ private static final int MAX_WAIT_LOOP = 5 * 60 * 5; // 5 minutes
+
+ private static final byte BLOCKING = (byte) 0x00;
+ private static final byte UNLATCHED = (byte) 0x01;
+
+ private static final String STATUS_PENDING = "PENDING";
+ private static final String STATUS_IN_PROGRESS = "IN_PROGRESS";
+
+ @ClassRule
+ public static TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ private static File sharedMemory;
+ private static Orchestrator orchestrator;
+ private static WsClient adminWsClient;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ sharedMemory = temporaryFolder.newFile();
+
+ OrchestratorBuilder builder = Orchestrator.builderEnv()
+ .addPlugin(pluginArtifact("fake-governance-plugin"))
+ .setServerProperty("fakeGoverance.workerLatch.sharedMemoryFile", sharedMemory.getAbsolutePath())
+ .addPlugin(xooPlugin());
+ orchestrator = builder.build();
+ orchestrator.start();
+
+ adminWsClient = newAdminWsClient(orchestrator);
+ }
+
+ @AfterClass
+ public static void stop() {
+ if (orchestrator != null) {
+ orchestrator.stop();
+ orchestrator = null;
+ }
+ }
+
+ @Before
+ public void setup() throws Exception {
+ unlockWorkersAndResetWorkerCount();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ unlockWorkersAndResetWorkerCount();
+ }
+
+ private void unlockWorkersAndResetWorkerCount() throws IOException {
+ RandomAccessFile randomAccessFile = null;
+ try {
+ randomAccessFile = new RandomAccessFile(sharedMemory, "rw");
+ MappedByteBuffer mappedByteBuffer = initMappedByteBuffer(randomAccessFile);
+ releaseAnyAnalysisWithFakeGovernancePlugin(mappedByteBuffer);
+ updateWorkerCount(1);
+ } finally {
+ close(randomAccessFile);
+ }
+ }
+
+ @Test
+ public void enabled_worker_count_is_initially_1_and_can_be_changed_dynamically_by_plugin() throws IOException {
+ assertThat(Files.lines(orchestrator.getServer().getCeLogs().toPath())
+ .filter(s -> s.contains("Compute Engine will use ")))
+ .isEmpty();
+
+ RandomAccessFile randomAccessFile = null;
+ try {
+ randomAccessFile = new RandomAccessFile(sharedMemory, "rw");
+ MappedByteBuffer mappedByteBuffer = initMappedByteBuffer(randomAccessFile);
+
+ verifyAnalysesRunInParallel(mappedByteBuffer, 1);
+
+ /* 4 <= newWorkerCount <= 7 */
+ int newWorkerCount = 4 + new Random().nextInt(4);
+ updateWorkerCount(newWorkerCount);
+
+ Set<String> line = Files.lines(orchestrator.getServer().getCeLogs().toPath())
+ .filter(s -> s.contains("Compute Engine will use "))
+ .collect(Collectors.toSet());
+ assertThat(line).hasSize(1);
+ assertThat(line.iterator().next()).contains(valueOf(newWorkerCount));
+
+ verifyAnalysesRunInParallel(mappedByteBuffer, newWorkerCount);
+
+ int lowerWorkerCount = 3;
+ updateWorkerCount(lowerWorkerCount);
+ verifyAnalysesRunInParallel(mappedByteBuffer, lowerWorkerCount);
+ } finally {
+ close(randomAccessFile);
+ }
+ }
+
+ private void updateWorkerCount(int newWorkerCount) {
+ orchestrator.getServer()
+ .newHttpCall("api/ce/refreshWorkerCount")
+ .setMethod(HttpMethod.POST)
+ .setParam("count", valueOf(newWorkerCount))
+ .execute();
+ }
+
+ private void verifyAnalysesRunInParallel(MappedByteBuffer mappedByteBuffer, int workerCount) {
+ assertThat(adminWsClient.ce().workerCount())
+ .extracting(WsCe.WorkerCountResponse::getValue, WsCe.WorkerCountResponse::getCanSetWorkerCount)
+ .containsOnly(workerCount, true);
+
+ blockAnyAnalysisWithFakeGovernancePlugin(mappedByteBuffer);
+
+ // start analysis of workerCount + 2 projects
+ List<String> projectKeys = IntStream.range(0, workerCount + 2).mapToObj(i -> "prj" + i).collect(toList());
+ for (String projectKey : projectKeys) {
+ SonarScanner sonarRunner = SonarScanner.create(ItUtils.projectDir("shared/xoo-sample"))
+ .setProperties("sonar.projectKey", projectKey);
+ orchestrator.executeBuild(sonarRunner, false);
+ }
+
+ List<WsCe.Task> tasksList = waitForWsCallStatus(
+ this::getTasksAllTasks,
+ (tasks) -> verifyInProgressTaskCount(tasks, workerCount));
+
+ assertThat(tasksList.stream()
+ .filter(ConcurrentCeWorkersTest::pending)
+ .map(WsCe.Task::getComponentKey)
+ .collect(toSet()))
+ .isEqualTo(copyOf(projectKeys.subList(workerCount, projectKeys.size())));
+ assertThat(tasksList.stream()
+ .filter(ConcurrentCeWorkersTest::inProgress)
+ .map(WsCe.Task::getComponentKey)
+ .collect(toSet()))
+ .isEqualTo(copyOf(projectKeys.subList(0, workerCount)));
+
+ releaseAnyAnalysisWithFakeGovernancePlugin(mappedByteBuffer);
+
+ waitForWsCallStatus(this::getTasksAllTasks, List::isEmpty);
+ }
+
+ private static MappedByteBuffer initMappedByteBuffer(RandomAccessFile randomAccessFile) throws IOException {
+ return randomAccessFile.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, 1);
+ }
+
+ private void releaseAnyAnalysisWithFakeGovernancePlugin(MappedByteBuffer mappedByteBuffer) {
+ // let the blocked analyses finish running
+ mappedByteBuffer.put(0, UNLATCHED);
+ }
+
+ private static void blockAnyAnalysisWithFakeGovernancePlugin(MappedByteBuffer mappedByteBuffer) {
+ // block any analysis which will run with the fake-governance-plugin
+ mappedByteBuffer.put(0, BLOCKING);
+ }
+
+ private void close(@Nullable RandomAccessFile randomAccessFile) throws IOException {
+ if (randomAccessFile != null) {
+ randomAccessFile.close();
+ }
+ }
+
+ private static boolean verifyInProgressTaskCount(List<WsCe.Task> tasksList, int workerCount) {
+ return tasksList.stream().filter(ConcurrentCeWorkersTest::inProgress).count() >= workerCount;
+ }
+
+ private static boolean pending(WsCe.Task task) {
+ return WsCe.TaskStatus.PENDING == task.getStatus();
+ }
+
+ private static boolean inProgress(WsCe.Task task) {
+ return WsCe.TaskStatus.IN_PROGRESS == task.getStatus();
+ }
+
+ private List<WsCe.Task> getTasksAllTasks(WsClient wsClient) {
+ return wsClient.ce().activity(new ActivityWsRequest()
+ .setStatus(ImmutableList.of(STATUS_PENDING, STATUS_IN_PROGRESS)))
+ .getTasksList();
+ }
+
+ private <T> T waitForWsCallStatus(Function<WsClient, T> call, Predicate<T> test) {
+ WsClient wsClient = ItUtils.newAdminWsClient(orchestrator);
+ int i = 0;
+ T returnValue = call.apply(wsClient);
+ boolean expectedState = test.test(returnValue);
+ while (i < MAX_WAIT_LOOP && !expectedState) {
+ waitInterruptedly();
+ i++;
+ returnValue = call.apply(wsClient);
+ expectedState = test.test(returnValue);
+ }
+ assertThat(expectedState)
+ .as("Failed to wait for expected queue status. Last call returned:\n%s", returnValue)
+ .isTrue();
+ return returnValue;
+ }
+
+ private static void waitInterruptedly() {
+ try {
+ Thread.sleep(WAIT);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+}