]> source.dussan.org Git - sonarqube.git/commitdiff
Extract concurrent ce worker test to its own class test
authorJulien Lancelot <julien.lancelot@sonarsource.com>
Mon, 19 Nov 2018 09:46:54 +0000 (10:46 +0100)
committerJulien Lancelot <julien.lancelot@sonarsource.com>
Mon, 19 Nov 2018 10:08:46 +0000 (11:08 +0100)
When tests are in the same class, a Deadlock is generated on MySQL :
### Error updating database.  Cause: com.mysql.jdbc.exceptions.jdbc4.MySQLTransactionRollbackException: Deadlock found when trying to get lock; try restarting transaction
### The error may involve org.sonar.db.ce.CeActivityMapper.insert-Inline
### The error occurred while setting parameters
### SQL: insert into ce_activity (       uuid,       component_uuid,       analysis_uuid,       status,       task_type,       is_last,       is_last_key,       submitter_login,       submitted_at,       worker_uuid,       execution_count,       started_at,       executed_at,       created_at,       updated_at,       execution_time_ms,       error_message,       error_stacktrace,       error_type     )     values (       ?,       ?,       ?,       ?,       ?,       ?,       ?,       ?,       ?,       ?,       ?,       ?,       ?,       ?,       ?,       ?,       ?,       ?,       ?     )
### Cause: com.mysql.jdbc.exceptions.jdbc4.MySQLTransactionRollbackException: Deadlock found when trying to get lock; try restarting transaction
at org.apache.ibatis.exceptions.ExceptionFactory.wrapException(ExceptionFactory.java:30)
at org.apache.ibatis.session.defaults.DefaultSqlSession.update(DefaultSqlSession.java:200)
at org.apache.ibatis.session.defaults.DefaultSqlSession.insert(DefaultSqlSession.java:185)
at org.apache.ibatis.binding.MapperMethod.execute(MapperMethod.java:57)
at org.apache.ibatis.binding.MapperProxy.invoke(MapperProxy.java:59)
at com.sun.proxy.$Proxy22.insert(Unknown Source)
at org.sonar.db.ce.CeActivityDao.insert(CeActivityDao.java:55)
at org.sonar.ce.queue.CeQueueImpl.remove(CeQueueImpl.java:156)
at org.sonar.ce.queue.InternalCeQueueImpl.remove(InternalCeQueueImpl.java:110)
at org.sonar.ce.taskprocessor.CeWorkerImpl.finalizeTask(CeWorkerImpl.java:165)
at org.sonar.ce.taskprocessor.CeWorkerImpl.executeTask(CeWorkerImpl.java:148)
at org.sonar.ce.taskprocessor.CeWorkerImpl.findAndProcessTask(CeWorkerImpl.java:97)
at org.sonar.ce.taskprocessor.CeWorkerImpl.withCustomizedThreadName(CeWorkerImpl.java:81)
at org.sonar.ce.taskprocessor.CeWorkerImpl.call(CeWorkerImpl.java:73)
at org.sonar.ce.taskprocessor.CeWorkerImpl.call(CeWorkerImpl.java:43)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

tests/src/test/java/org/sonarqube/tests/Category5Suite.java
tests/src/test/java/org/sonarqube/tests/ce/CeWorkersTest.java
tests/src/test/java/org/sonarqube/tests/ce/ConcurrentCeWorkersTest.java [new file with mode: 0644]

index 18209ae65d011cd34c72cba053f581120d2e3126..369ba0ff4a47591b9d21d544fa6433d63dd76444 100644 (file)
@@ -25,6 +25,7 @@ import org.sonarqube.tests.analysis.AnalysisEsResilienceTest;
 import org.sonarqube.tests.authorisation.SystemPasscodeTest;
 import org.sonarqube.tests.ce.CeShutdownTest;
 import org.sonarqube.tests.ce.CeWorkersTest;
+import org.sonarqube.tests.ce.ConcurrentCeWorkersTest;
 import org.sonarqube.tests.issue.IssueCreationDatePluginChangedTest;
 import org.sonarqube.tests.marketplace.UpdateCenterTest;
 import org.sonarqube.tests.qualityProfile.ActiveRuleEsResilienceTest;
@@ -70,6 +71,7 @@ import org.sonarqube.tests.user.UserEsResilienceTest;
   // ce
   CeShutdownTest.class,
   CeWorkersTest.class,
+  ConcurrentCeWorkersTest.class,
   // issues
   IssueCreationDatePluginChangedTest.class,
 
index 64d0625e1922db771d4890af48f1caa33420333b..ffbd8a408ca45b73f61e8bf12731d53d7a9bc250 100644 (file)
@@ -23,67 +23,34 @@ import com.google.common.collect.ImmutableList;
 import com.sonar.orchestrator.Orchestrator;
 import com.sonar.orchestrator.OrchestratorBuilder;
 import com.sonar.orchestrator.build.SonarScanner;
-import com.sonar.orchestrator.http.HttpMethod;
-import java.io.File;
 import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.MappedByteBuffer;
-import java.nio.channels.FileChannel;
-import java.nio.file.Files;
-import java.util.List;
-import java.util.Random;
-import java.util.Set;
-import java.util.function.Function;
-import java.util.function.Predicate;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-import javax.annotation.Nullable;
-import org.junit.After;
 import org.junit.AfterClass;
-import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
-import org.sonarqube.ws.WsCe;
 import org.sonarqube.ws.client.PostRequest;
 import org.sonarqube.ws.client.WsClient;
 import org.sonarqube.ws.client.ce.ActivityWsRequest;
 import util.ItUtils;
 
-import static com.google.common.collect.ImmutableSet.copyOf;
-import static java.lang.String.valueOf;
-import static java.util.stream.Collectors.toList;
-import static java.util.stream.Collectors.toSet;
 import static org.assertj.core.api.Assertions.assertThat;
 import static util.ItUtils.newAdminWsClient;
 import static util.ItUtils.pluginArtifact;
 import static util.ItUtils.xooPlugin;
 
 public class CeWorkersTest {
-  private static final int WAIT = 200; // ms
-  private static final int MAX_WAIT_LOOP = 5 * 60 * 5; // 5 minutes
-
-  private static final byte BLOCKING = (byte) 0x00;
-  private static final byte UNLATCHED = (byte) 0x01;
-
-  private static final String STATUS_PENDING = "PENDING";
-  private static final String STATUS_IN_PROGRESS = "IN_PROGRESS";
 
   @ClassRule
   public static TemporaryFolder temporaryFolder = new TemporaryFolder();
 
-  private static File sharedMemory;
   private static Orchestrator orchestrator;
   private static WsClient adminWsClient;
 
   @BeforeClass
   public static void setUp() throws Exception {
-    sharedMemory = temporaryFolder.newFile();
-
     OrchestratorBuilder builder = Orchestrator.builderEnv()
       .addPlugin(pluginArtifact("fake-governance-plugin"))
-      .setServerProperty("fakeGoverance.workerLatch.sharedMemoryFile", sharedMemory.getAbsolutePath())
       // overwrite default value to display heap dump on OOM and reduce max heap
       .setServerProperty("sonar.ce.javaOpts", "-Xmx256m -Xms128m")
       .addPlugin(xooPlugin());
@@ -101,28 +68,6 @@ public class CeWorkersTest {
     }
   }
 
-  @Before
-  public void setup() throws Exception {
-    unlockWorkersAndResetWorkerCount();
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    unlockWorkersAndResetWorkerCount();
-  }
-
-  private void unlockWorkersAndResetWorkerCount() throws IOException {
-    RandomAccessFile randomAccessFile = null;
-    try {
-      randomAccessFile = new RandomAccessFile(sharedMemory, "rw");
-      MappedByteBuffer mappedByteBuffer = initMappedByteBuffer(randomAccessFile);
-      releaseAnyAnalysisWithFakeGovernancePlugin(mappedByteBuffer);
-      updateWorkerCount(1);
-    } finally {
-      close(randomAccessFile);
-    }
-  }
-
   @Test
   public void ce_worker_is_resilient_to_OOM_and_ISE_during_processing_of_a_task() throws InterruptedException {
     submitFakeTask("OOM");
@@ -240,145 +185,6 @@ public class CeWorkersTest {
       .failIfNotSuccessful();
   }
 
-  @Test
-  public void enabled_worker_count_is_initially_1_and_can_be_changed_dynamically_by_plugin() throws IOException {
-    assertThat(Files.lines(orchestrator.getServer().getCeLogs().toPath())
-      .filter(s -> s.contains("Compute Engine will use ")))
-        .isEmpty();
-
-    RandomAccessFile randomAccessFile = null;
-    try {
-      randomAccessFile = new RandomAccessFile(sharedMemory, "rw");
-      MappedByteBuffer mappedByteBuffer = initMappedByteBuffer(randomAccessFile);
-
-      verifyAnalysesRunInParallel(mappedByteBuffer, 1);
-
-      /* 4 <= newWorkerCount <= 7 */
-      int newWorkerCount = 4 + new Random().nextInt(4);
-      updateWorkerCount(newWorkerCount);
-
-      Set<String> line = Files.lines(orchestrator.getServer().getCeLogs().toPath())
-        .filter(s -> s.contains("Compute Engine will use "))
-        .collect(Collectors.toSet());
-      assertThat(line).hasSize(1);
-      assertThat(line.iterator().next()).contains(valueOf(newWorkerCount));
-
-      verifyAnalysesRunInParallel(mappedByteBuffer, newWorkerCount);
-
-      int lowerWorkerCount = 3;
-      updateWorkerCount(lowerWorkerCount);
-      verifyAnalysesRunInParallel(mappedByteBuffer, lowerWorkerCount);
-    } finally {
-      close(randomAccessFile);
-    }
-  }
-
-  private void updateWorkerCount(int newWorkerCount) {
-    orchestrator.getServer()
-      .newHttpCall("api/ce/refreshWorkerCount")
-      .setMethod(HttpMethod.POST)
-      .setParam("count", valueOf(newWorkerCount))
-      .execute();
-  }
-
-  private void verifyAnalysesRunInParallel(MappedByteBuffer mappedByteBuffer, int workerCount) {
-    assertThat(adminWsClient.ce().workerCount())
-      .extracting(WsCe.WorkerCountResponse::getValue, WsCe.WorkerCountResponse::getCanSetWorkerCount)
-      .containsOnly(workerCount, true);
-
-    blockAnyAnalysisWithFakeGovernancePlugin(mappedByteBuffer);
-
-    // start analysis of workerCount + 2 projects
-    List<String> projectKeys = IntStream.range(0, workerCount + 2).mapToObj(i -> "prj" + i).collect(toList());
-    for (String projectKey : projectKeys) {
-      SonarScanner sonarRunner = SonarScanner.create(ItUtils.projectDir("shared/xoo-sample"))
-        .setProperties("sonar.projectKey", projectKey);
-      orchestrator.executeBuild(sonarRunner, false);
-    }
-
-    List<WsCe.Task> tasksList = waitForWsCallStatus(
-      this::getTasksAllTasks,
-      (tasks) -> verifyInProgressTaskCount(tasks, workerCount));
-
-    assertThat(tasksList.stream()
-      .filter(CeWorkersTest::pending)
-      .map(WsCe.Task::getComponentKey)
-      .collect(toSet()))
-        .isEqualTo(copyOf(projectKeys.subList(workerCount, projectKeys.size())));
-    assertThat(tasksList.stream()
-      .filter(CeWorkersTest::inProgress)
-      .map(WsCe.Task::getComponentKey)
-      .collect(toSet()))
-        .isEqualTo(copyOf(projectKeys.subList(0, workerCount)));
-
-    releaseAnyAnalysisWithFakeGovernancePlugin(mappedByteBuffer);
-
-    waitForWsCallStatus(this::getTasksAllTasks, List::isEmpty);
-  }
-
-  private static MappedByteBuffer initMappedByteBuffer(RandomAccessFile randomAccessFile) throws IOException {
-    return randomAccessFile.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, 1);
-  }
-
-  private void releaseAnyAnalysisWithFakeGovernancePlugin(MappedByteBuffer mappedByteBuffer) {
-    // let the blocked analyses finish running
-    mappedByteBuffer.put(0, UNLATCHED);
-  }
-
-  private static void blockAnyAnalysisWithFakeGovernancePlugin(MappedByteBuffer mappedByteBuffer) {
-    // block any analysis which will run with the fake-governance-plugin
-    mappedByteBuffer.put(0, BLOCKING);
-  }
-
-  private void close(@Nullable RandomAccessFile randomAccessFile) throws IOException {
-    if (randomAccessFile != null) {
-      randomAccessFile.close();
-    }
-  }
-
-  private static boolean verifyInProgressTaskCount(List<WsCe.Task> tasksList, int workerCount) {
-    return tasksList.stream().filter(CeWorkersTest::inProgress).count() >= workerCount;
-  }
-
-  private static boolean pending(WsCe.Task task) {
-    return WsCe.TaskStatus.PENDING == task.getStatus();
-  }
-
-  private static boolean inProgress(WsCe.Task task) {
-    return WsCe.TaskStatus.IN_PROGRESS == task.getStatus();
-  }
-
-  private List<WsCe.Task> getTasksAllTasks(WsClient wsClient) {
-    return wsClient.ce().activity(new ActivityWsRequest()
-      .setStatus(ImmutableList.of(STATUS_PENDING, STATUS_IN_PROGRESS)))
-      .getTasksList();
-  }
-
-  private <T> T waitForWsCallStatus(Function<WsClient, T> call, Predicate<T> test) {
-    WsClient wsClient = ItUtils.newAdminWsClient(orchestrator);
-    int i = 0;
-    T returnValue = call.apply(wsClient);
-    boolean expectedState = test.test(returnValue);
-    while (i < MAX_WAIT_LOOP && !expectedState) {
-      waitInterruptedly();
-      i++;
-      returnValue = call.apply(wsClient);
-      expectedState = test.test(returnValue);
-    }
-    assertThat(expectedState)
-      .as("Failed to wait for expected queue status. Last call returned:\n%s", returnValue)
-      .isTrue();
-    return returnValue;
-  }
-
-  private static void waitInterruptedly() {
-    try {
-      Thread.sleep(WAIT);
-    } catch (InterruptedException e) {
-      e.printStackTrace();
-    }
-  }
-
   private void waitForEmptyQueue() throws InterruptedException {
     int delay = 200;
     int timeout = 5 * 10; // 10 seconds
diff --git a/tests/src/test/java/org/sonarqube/tests/ce/ConcurrentCeWorkersTest.java b/tests/src/test/java/org/sonarqube/tests/ce/ConcurrentCeWorkersTest.java
new file mode 100644 (file)
index 0000000..1173fe0
--- /dev/null
@@ -0,0 +1,262 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+package org.sonarqube.tests.ce;
+
+import com.google.common.collect.ImmutableList;
+import com.sonar.orchestrator.Orchestrator;
+import com.sonar.orchestrator.OrchestratorBuilder;
+import com.sonar.orchestrator.build.SonarScanner;
+import com.sonar.orchestrator.http.HttpMethod;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import javax.annotation.Nullable;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.sonarqube.ws.WsCe;
+import org.sonarqube.ws.client.WsClient;
+import org.sonarqube.ws.client.ce.ActivityWsRequest;
+import util.ItUtils;
+
+import static com.google.common.collect.ImmutableSet.copyOf;
+import static java.lang.String.valueOf;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toSet;
+import static org.assertj.core.api.Assertions.assertThat;
+import static util.ItUtils.newAdminWsClient;
+import static util.ItUtils.pluginArtifact;
+import static util.ItUtils.xooPlugin;
+
+public class ConcurrentCeWorkersTest {
+  private static final int WAIT = 200; // ms
+  private static final int MAX_WAIT_LOOP = 5 * 60 * 5; // 5 minutes
+
+  private static final byte BLOCKING = (byte) 0x00;
+  private static final byte UNLATCHED = (byte) 0x01;
+
+  private static final String STATUS_PENDING = "PENDING";
+  private static final String STATUS_IN_PROGRESS = "IN_PROGRESS";
+
+  @ClassRule
+  public static TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+  private static File sharedMemory;
+  private static Orchestrator orchestrator;
+  private static WsClient adminWsClient;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    sharedMemory = temporaryFolder.newFile();
+
+    OrchestratorBuilder builder = Orchestrator.builderEnv()
+      .addPlugin(pluginArtifact("fake-governance-plugin"))
+      .setServerProperty("fakeGoverance.workerLatch.sharedMemoryFile", sharedMemory.getAbsolutePath())
+      .addPlugin(xooPlugin());
+    orchestrator = builder.build();
+    orchestrator.start();
+
+    adminWsClient = newAdminWsClient(orchestrator);
+  }
+
+  @AfterClass
+  public static void stop() {
+    if (orchestrator != null) {
+      orchestrator.stop();
+      orchestrator = null;
+    }
+  }
+
+  @Before
+  public void setup() throws Exception {
+    unlockWorkersAndResetWorkerCount();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    unlockWorkersAndResetWorkerCount();
+  }
+
+  private void unlockWorkersAndResetWorkerCount() throws IOException {
+    RandomAccessFile randomAccessFile = null;
+    try {
+      randomAccessFile = new RandomAccessFile(sharedMemory, "rw");
+      MappedByteBuffer mappedByteBuffer = initMappedByteBuffer(randomAccessFile);
+      releaseAnyAnalysisWithFakeGovernancePlugin(mappedByteBuffer);
+      updateWorkerCount(1);
+    } finally {
+      close(randomAccessFile);
+    }
+  }
+
+  @Test
+  public void enabled_worker_count_is_initially_1_and_can_be_changed_dynamically_by_plugin() throws IOException {
+    assertThat(Files.lines(orchestrator.getServer().getCeLogs().toPath())
+      .filter(s -> s.contains("Compute Engine will use ")))
+        .isEmpty();
+
+    RandomAccessFile randomAccessFile = null;
+    try {
+      randomAccessFile = new RandomAccessFile(sharedMemory, "rw");
+      MappedByteBuffer mappedByteBuffer = initMappedByteBuffer(randomAccessFile);
+
+      verifyAnalysesRunInParallel(mappedByteBuffer, 1);
+
+      /* 4 <= newWorkerCount <= 7 */
+      int newWorkerCount = 4 + new Random().nextInt(4);
+      updateWorkerCount(newWorkerCount);
+
+      Set<String> line = Files.lines(orchestrator.getServer().getCeLogs().toPath())
+        .filter(s -> s.contains("Compute Engine will use "))
+        .collect(Collectors.toSet());
+      assertThat(line).hasSize(1);
+      assertThat(line.iterator().next()).contains(valueOf(newWorkerCount));
+
+      verifyAnalysesRunInParallel(mappedByteBuffer, newWorkerCount);
+
+      int lowerWorkerCount = 3;
+      updateWorkerCount(lowerWorkerCount);
+      verifyAnalysesRunInParallel(mappedByteBuffer, lowerWorkerCount);
+    } finally {
+      close(randomAccessFile);
+    }
+  }
+
+  private void updateWorkerCount(int newWorkerCount) {
+    orchestrator.getServer()
+      .newHttpCall("api/ce/refreshWorkerCount")
+      .setMethod(HttpMethod.POST)
+      .setParam("count", valueOf(newWorkerCount))
+      .execute();
+  }
+
+  private void verifyAnalysesRunInParallel(MappedByteBuffer mappedByteBuffer, int workerCount) {
+    assertThat(adminWsClient.ce().workerCount())
+      .extracting(WsCe.WorkerCountResponse::getValue, WsCe.WorkerCountResponse::getCanSetWorkerCount)
+      .containsOnly(workerCount, true);
+
+    blockAnyAnalysisWithFakeGovernancePlugin(mappedByteBuffer);
+
+    // start analysis of workerCount + 2 projects
+    List<String> projectKeys = IntStream.range(0, workerCount + 2).mapToObj(i -> "prj" + i).collect(toList());
+    for (String projectKey : projectKeys) {
+      SonarScanner sonarRunner = SonarScanner.create(ItUtils.projectDir("shared/xoo-sample"))
+        .setProperties("sonar.projectKey", projectKey);
+      orchestrator.executeBuild(sonarRunner, false);
+    }
+
+    List<WsCe.Task> tasksList = waitForWsCallStatus(
+      this::getTasksAllTasks,
+      (tasks) -> verifyInProgressTaskCount(tasks, workerCount));
+
+    assertThat(tasksList.stream()
+      .filter(ConcurrentCeWorkersTest::pending)
+      .map(WsCe.Task::getComponentKey)
+      .collect(toSet()))
+        .isEqualTo(copyOf(projectKeys.subList(workerCount, projectKeys.size())));
+    assertThat(tasksList.stream()
+      .filter(ConcurrentCeWorkersTest::inProgress)
+      .map(WsCe.Task::getComponentKey)
+      .collect(toSet()))
+        .isEqualTo(copyOf(projectKeys.subList(0, workerCount)));
+
+    releaseAnyAnalysisWithFakeGovernancePlugin(mappedByteBuffer);
+
+    waitForWsCallStatus(this::getTasksAllTasks, List::isEmpty);
+  }
+
+  private static MappedByteBuffer initMappedByteBuffer(RandomAccessFile randomAccessFile) throws IOException {
+    return randomAccessFile.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, 1);
+  }
+
+  private void releaseAnyAnalysisWithFakeGovernancePlugin(MappedByteBuffer mappedByteBuffer) {
+    // let the blocked analyses finish running
+    mappedByteBuffer.put(0, UNLATCHED);
+  }
+
+  private static void blockAnyAnalysisWithFakeGovernancePlugin(MappedByteBuffer mappedByteBuffer) {
+    // block any analysis which will run with the fake-governance-plugin
+    mappedByteBuffer.put(0, BLOCKING);
+  }
+
+  private void close(@Nullable RandomAccessFile randomAccessFile) throws IOException {
+    if (randomAccessFile != null) {
+      randomAccessFile.close();
+    }
+  }
+
+  private static boolean verifyInProgressTaskCount(List<WsCe.Task> tasksList, int workerCount) {
+    return tasksList.stream().filter(ConcurrentCeWorkersTest::inProgress).count() >= workerCount;
+  }
+
+  private static boolean pending(WsCe.Task task) {
+    return WsCe.TaskStatus.PENDING == task.getStatus();
+  }
+
+  private static boolean inProgress(WsCe.Task task) {
+    return WsCe.TaskStatus.IN_PROGRESS == task.getStatus();
+  }
+
+  private List<WsCe.Task> getTasksAllTasks(WsClient wsClient) {
+    return wsClient.ce().activity(new ActivityWsRequest()
+      .setStatus(ImmutableList.of(STATUS_PENDING, STATUS_IN_PROGRESS)))
+      .getTasksList();
+  }
+
+  private <T> T waitForWsCallStatus(Function<WsClient, T> call, Predicate<T> test) {
+    WsClient wsClient = ItUtils.newAdminWsClient(orchestrator);
+    int i = 0;
+    T returnValue = call.apply(wsClient);
+    boolean expectedState = test.test(returnValue);
+    while (i < MAX_WAIT_LOOP && !expectedState) {
+      waitInterruptedly();
+      i++;
+      returnValue = call.apply(wsClient);
+      expectedState = test.test(returnValue);
+    }
+    assertThat(expectedState)
+      .as("Failed to wait for expected queue status. Last call returned:\n%s", returnValue)
+      .isTrue();
+    return returnValue;
+  }
+
+  private static void waitInterruptedly() {
+    try {
+      Thread.sleep(WAIT);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+  }
+
+}