From: Julien Lancelot Date: Mon, 19 Nov 2018 09:46:54 +0000 (+0100) Subject: Extract concurrent ce worker test to its own class test X-Git-Tag: 6.7.6~6 X-Git-Url: https://source.dussan.org/?a=commitdiff_plain;h=5e45d1322ed79da2ec30934276e326e3ae9273b3;p=sonarqube.git Extract concurrent ce worker test to its own class test When tests are in the same class, a Deadlock is generated on MySQL : ### Error updating database. Cause: com.mysql.jdbc.exceptions.jdbc4.MySQLTransactionRollbackException: Deadlock found when trying to get lock; try restarting transaction ### The error may involve org.sonar.db.ce.CeActivityMapper.insert-Inline ### The error occurred while setting parameters ### SQL: insert into ce_activity ( uuid, component_uuid, analysis_uuid, status, task_type, is_last, is_last_key, submitter_login, submitted_at, worker_uuid, execution_count, started_at, executed_at, created_at, updated_at, execution_time_ms, error_message, error_stacktrace, error_type ) values ( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? ) ### Cause: com.mysql.jdbc.exceptions.jdbc4.MySQLTransactionRollbackException: Deadlock found when trying to get lock; try restarting transaction at org.apache.ibatis.exceptions.ExceptionFactory.wrapException(ExceptionFactory.java:30) at org.apache.ibatis.session.defaults.DefaultSqlSession.update(DefaultSqlSession.java:200) at org.apache.ibatis.session.defaults.DefaultSqlSession.insert(DefaultSqlSession.java:185) at org.apache.ibatis.binding.MapperMethod.execute(MapperMethod.java:57) at org.apache.ibatis.binding.MapperProxy.invoke(MapperProxy.java:59) at com.sun.proxy.$Proxy22.insert(Unknown Source) at org.sonar.db.ce.CeActivityDao.insert(CeActivityDao.java:55) at org.sonar.ce.queue.CeQueueImpl.remove(CeQueueImpl.java:156) at org.sonar.ce.queue.InternalCeQueueImpl.remove(InternalCeQueueImpl.java:110) at org.sonar.ce.taskprocessor.CeWorkerImpl.finalizeTask(CeWorkerImpl.java:165) at org.sonar.ce.taskprocessor.CeWorkerImpl.executeTask(CeWorkerImpl.java:148) at org.sonar.ce.taskprocessor.CeWorkerImpl.findAndProcessTask(CeWorkerImpl.java:97) at org.sonar.ce.taskprocessor.CeWorkerImpl.withCustomizedThreadName(CeWorkerImpl.java:81) at org.sonar.ce.taskprocessor.CeWorkerImpl.call(CeWorkerImpl.java:73) at org.sonar.ce.taskprocessor.CeWorkerImpl.call(CeWorkerImpl.java:43) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) --- diff --git a/tests/src/test/java/org/sonarqube/tests/Category5Suite.java b/tests/src/test/java/org/sonarqube/tests/Category5Suite.java index 18209ae65d0..369ba0ff4a4 100644 --- a/tests/src/test/java/org/sonarqube/tests/Category5Suite.java +++ b/tests/src/test/java/org/sonarqube/tests/Category5Suite.java @@ -25,6 +25,7 @@ import org.sonarqube.tests.analysis.AnalysisEsResilienceTest; import org.sonarqube.tests.authorisation.SystemPasscodeTest; import org.sonarqube.tests.ce.CeShutdownTest; import org.sonarqube.tests.ce.CeWorkersTest; +import org.sonarqube.tests.ce.ConcurrentCeWorkersTest; import org.sonarqube.tests.issue.IssueCreationDatePluginChangedTest; import org.sonarqube.tests.marketplace.UpdateCenterTest; import org.sonarqube.tests.qualityProfile.ActiveRuleEsResilienceTest; @@ -70,6 +71,7 @@ import org.sonarqube.tests.user.UserEsResilienceTest; // ce CeShutdownTest.class, CeWorkersTest.class, + ConcurrentCeWorkersTest.class, // issues IssueCreationDatePluginChangedTest.class, diff --git a/tests/src/test/java/org/sonarqube/tests/ce/CeWorkersTest.java b/tests/src/test/java/org/sonarqube/tests/ce/CeWorkersTest.java index 64d0625e192..ffbd8a408ca 100644 --- a/tests/src/test/java/org/sonarqube/tests/ce/CeWorkersTest.java +++ b/tests/src/test/java/org/sonarqube/tests/ce/CeWorkersTest.java @@ -23,67 +23,34 @@ import com.google.common.collect.ImmutableList; import com.sonar.orchestrator.Orchestrator; import com.sonar.orchestrator.OrchestratorBuilder; import com.sonar.orchestrator.build.SonarScanner; -import com.sonar.orchestrator.http.HttpMethod; -import java.io.File; import java.io.IOException; -import java.io.RandomAccessFile; -import java.nio.MappedByteBuffer; -import java.nio.channels.FileChannel; -import java.nio.file.Files; -import java.util.List; -import java.util.Random; -import java.util.Set; -import java.util.function.Function; -import java.util.function.Predicate; -import java.util.stream.Collectors; -import java.util.stream.IntStream; -import javax.annotation.Nullable; -import org.junit.After; import org.junit.AfterClass; -import org.junit.Before; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; import org.junit.rules.TemporaryFolder; -import org.sonarqube.ws.WsCe; import org.sonarqube.ws.client.PostRequest; import org.sonarqube.ws.client.WsClient; import org.sonarqube.ws.client.ce.ActivityWsRequest; import util.ItUtils; -import static com.google.common.collect.ImmutableSet.copyOf; -import static java.lang.String.valueOf; -import static java.util.stream.Collectors.toList; -import static java.util.stream.Collectors.toSet; import static org.assertj.core.api.Assertions.assertThat; import static util.ItUtils.newAdminWsClient; import static util.ItUtils.pluginArtifact; import static util.ItUtils.xooPlugin; public class CeWorkersTest { - private static final int WAIT = 200; // ms - private static final int MAX_WAIT_LOOP = 5 * 60 * 5; // 5 minutes - - private static final byte BLOCKING = (byte) 0x00; - private static final byte UNLATCHED = (byte) 0x01; - - private static final String STATUS_PENDING = "PENDING"; - private static final String STATUS_IN_PROGRESS = "IN_PROGRESS"; @ClassRule public static TemporaryFolder temporaryFolder = new TemporaryFolder(); - private static File sharedMemory; private static Orchestrator orchestrator; private static WsClient adminWsClient; @BeforeClass public static void setUp() throws Exception { - sharedMemory = temporaryFolder.newFile(); - OrchestratorBuilder builder = Orchestrator.builderEnv() .addPlugin(pluginArtifact("fake-governance-plugin")) - .setServerProperty("fakeGoverance.workerLatch.sharedMemoryFile", sharedMemory.getAbsolutePath()) // overwrite default value to display heap dump on OOM and reduce max heap .setServerProperty("sonar.ce.javaOpts", "-Xmx256m -Xms128m") .addPlugin(xooPlugin()); @@ -101,28 +68,6 @@ public class CeWorkersTest { } } - @Before - public void setup() throws Exception { - unlockWorkersAndResetWorkerCount(); - } - - @After - public void tearDown() throws Exception { - unlockWorkersAndResetWorkerCount(); - } - - private void unlockWorkersAndResetWorkerCount() throws IOException { - RandomAccessFile randomAccessFile = null; - try { - randomAccessFile = new RandomAccessFile(sharedMemory, "rw"); - MappedByteBuffer mappedByteBuffer = initMappedByteBuffer(randomAccessFile); - releaseAnyAnalysisWithFakeGovernancePlugin(mappedByteBuffer); - updateWorkerCount(1); - } finally { - close(randomAccessFile); - } - } - @Test public void ce_worker_is_resilient_to_OOM_and_ISE_during_processing_of_a_task() throws InterruptedException { submitFakeTask("OOM"); @@ -240,145 +185,6 @@ public class CeWorkersTest { .failIfNotSuccessful(); } - @Test - public void enabled_worker_count_is_initially_1_and_can_be_changed_dynamically_by_plugin() throws IOException { - assertThat(Files.lines(orchestrator.getServer().getCeLogs().toPath()) - .filter(s -> s.contains("Compute Engine will use "))) - .isEmpty(); - - RandomAccessFile randomAccessFile = null; - try { - randomAccessFile = new RandomAccessFile(sharedMemory, "rw"); - MappedByteBuffer mappedByteBuffer = initMappedByteBuffer(randomAccessFile); - - verifyAnalysesRunInParallel(mappedByteBuffer, 1); - - /* 4 <= newWorkerCount <= 7 */ - int newWorkerCount = 4 + new Random().nextInt(4); - updateWorkerCount(newWorkerCount); - - Set line = Files.lines(orchestrator.getServer().getCeLogs().toPath()) - .filter(s -> s.contains("Compute Engine will use ")) - .collect(Collectors.toSet()); - assertThat(line).hasSize(1); - assertThat(line.iterator().next()).contains(valueOf(newWorkerCount)); - - verifyAnalysesRunInParallel(mappedByteBuffer, newWorkerCount); - - int lowerWorkerCount = 3; - updateWorkerCount(lowerWorkerCount); - verifyAnalysesRunInParallel(mappedByteBuffer, lowerWorkerCount); - } finally { - close(randomAccessFile); - } - } - - private void updateWorkerCount(int newWorkerCount) { - orchestrator.getServer() - .newHttpCall("api/ce/refreshWorkerCount") - .setMethod(HttpMethod.POST) - .setParam("count", valueOf(newWorkerCount)) - .execute(); - } - - private void verifyAnalysesRunInParallel(MappedByteBuffer mappedByteBuffer, int workerCount) { - assertThat(adminWsClient.ce().workerCount()) - .extracting(WsCe.WorkerCountResponse::getValue, WsCe.WorkerCountResponse::getCanSetWorkerCount) - .containsOnly(workerCount, true); - - blockAnyAnalysisWithFakeGovernancePlugin(mappedByteBuffer); - - // start analysis of workerCount + 2 projects - List projectKeys = IntStream.range(0, workerCount + 2).mapToObj(i -> "prj" + i).collect(toList()); - for (String projectKey : projectKeys) { - SonarScanner sonarRunner = SonarScanner.create(ItUtils.projectDir("shared/xoo-sample")) - .setProperties("sonar.projectKey", projectKey); - orchestrator.executeBuild(sonarRunner, false); - } - - List tasksList = waitForWsCallStatus( - this::getTasksAllTasks, - (tasks) -> verifyInProgressTaskCount(tasks, workerCount)); - - assertThat(tasksList.stream() - .filter(CeWorkersTest::pending) - .map(WsCe.Task::getComponentKey) - .collect(toSet())) - .isEqualTo(copyOf(projectKeys.subList(workerCount, projectKeys.size()))); - assertThat(tasksList.stream() - .filter(CeWorkersTest::inProgress) - .map(WsCe.Task::getComponentKey) - .collect(toSet())) - .isEqualTo(copyOf(projectKeys.subList(0, workerCount))); - - releaseAnyAnalysisWithFakeGovernancePlugin(mappedByteBuffer); - - waitForWsCallStatus(this::getTasksAllTasks, List::isEmpty); - } - - private static MappedByteBuffer initMappedByteBuffer(RandomAccessFile randomAccessFile) throws IOException { - return randomAccessFile.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, 1); - } - - private void releaseAnyAnalysisWithFakeGovernancePlugin(MappedByteBuffer mappedByteBuffer) { - // let the blocked analyses finish running - mappedByteBuffer.put(0, UNLATCHED); - } - - private static void blockAnyAnalysisWithFakeGovernancePlugin(MappedByteBuffer mappedByteBuffer) { - // block any analysis which will run with the fake-governance-plugin - mappedByteBuffer.put(0, BLOCKING); - } - - private void close(@Nullable RandomAccessFile randomAccessFile) throws IOException { - if (randomAccessFile != null) { - randomAccessFile.close(); - } - } - - private static boolean verifyInProgressTaskCount(List tasksList, int workerCount) { - return tasksList.stream().filter(CeWorkersTest::inProgress).count() >= workerCount; - } - - private static boolean pending(WsCe.Task task) { - return WsCe.TaskStatus.PENDING == task.getStatus(); - } - - private static boolean inProgress(WsCe.Task task) { - return WsCe.TaskStatus.IN_PROGRESS == task.getStatus(); - } - - private List getTasksAllTasks(WsClient wsClient) { - return wsClient.ce().activity(new ActivityWsRequest() - .setStatus(ImmutableList.of(STATUS_PENDING, STATUS_IN_PROGRESS))) - .getTasksList(); - } - - private T waitForWsCallStatus(Function call, Predicate test) { - WsClient wsClient = ItUtils.newAdminWsClient(orchestrator); - int i = 0; - T returnValue = call.apply(wsClient); - boolean expectedState = test.test(returnValue); - while (i < MAX_WAIT_LOOP && !expectedState) { - waitInterruptedly(); - i++; - returnValue = call.apply(wsClient); - expectedState = test.test(returnValue); - } - assertThat(expectedState) - .as("Failed to wait for expected queue status. Last call returned:\n%s", returnValue) - .isTrue(); - return returnValue; - } - - private static void waitInterruptedly() { - try { - Thread.sleep(WAIT); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - private void waitForEmptyQueue() throws InterruptedException { int delay = 200; int timeout = 5 * 10; // 10 seconds diff --git a/tests/src/test/java/org/sonarqube/tests/ce/ConcurrentCeWorkersTest.java b/tests/src/test/java/org/sonarqube/tests/ce/ConcurrentCeWorkersTest.java new file mode 100644 index 00000000000..1173fe05f80 --- /dev/null +++ b/tests/src/test/java/org/sonarqube/tests/ce/ConcurrentCeWorkersTest.java @@ -0,0 +1,262 @@ +/* + * SonarQube + * Copyright (C) 2009-2017 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonarqube.tests.ce; + +import com.google.common.collect.ImmutableList; +import com.sonar.orchestrator.Orchestrator; +import com.sonar.orchestrator.OrchestratorBuilder; +import com.sonar.orchestrator.build.SonarScanner; +import com.sonar.orchestrator.http.HttpMethod; +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.util.List; +import java.util.Random; +import java.util.Set; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import javax.annotation.Nullable; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.sonarqube.ws.WsCe; +import org.sonarqube.ws.client.WsClient; +import org.sonarqube.ws.client.ce.ActivityWsRequest; +import util.ItUtils; + +import static com.google.common.collect.ImmutableSet.copyOf; +import static java.lang.String.valueOf; +import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toSet; +import static org.assertj.core.api.Assertions.assertThat; +import static util.ItUtils.newAdminWsClient; +import static util.ItUtils.pluginArtifact; +import static util.ItUtils.xooPlugin; + +public class ConcurrentCeWorkersTest { + private static final int WAIT = 200; // ms + private static final int MAX_WAIT_LOOP = 5 * 60 * 5; // 5 minutes + + private static final byte BLOCKING = (byte) 0x00; + private static final byte UNLATCHED = (byte) 0x01; + + private static final String STATUS_PENDING = "PENDING"; + private static final String STATUS_IN_PROGRESS = "IN_PROGRESS"; + + @ClassRule + public static TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static File sharedMemory; + private static Orchestrator orchestrator; + private static WsClient adminWsClient; + + @BeforeClass + public static void setUp() throws Exception { + sharedMemory = temporaryFolder.newFile(); + + OrchestratorBuilder builder = Orchestrator.builderEnv() + .addPlugin(pluginArtifact("fake-governance-plugin")) + .setServerProperty("fakeGoverance.workerLatch.sharedMemoryFile", sharedMemory.getAbsolutePath()) + .addPlugin(xooPlugin()); + orchestrator = builder.build(); + orchestrator.start(); + + adminWsClient = newAdminWsClient(orchestrator); + } + + @AfterClass + public static void stop() { + if (orchestrator != null) { + orchestrator.stop(); + orchestrator = null; + } + } + + @Before + public void setup() throws Exception { + unlockWorkersAndResetWorkerCount(); + } + + @After + public void tearDown() throws Exception { + unlockWorkersAndResetWorkerCount(); + } + + private void unlockWorkersAndResetWorkerCount() throws IOException { + RandomAccessFile randomAccessFile = null; + try { + randomAccessFile = new RandomAccessFile(sharedMemory, "rw"); + MappedByteBuffer mappedByteBuffer = initMappedByteBuffer(randomAccessFile); + releaseAnyAnalysisWithFakeGovernancePlugin(mappedByteBuffer); + updateWorkerCount(1); + } finally { + close(randomAccessFile); + } + } + + @Test + public void enabled_worker_count_is_initially_1_and_can_be_changed_dynamically_by_plugin() throws IOException { + assertThat(Files.lines(orchestrator.getServer().getCeLogs().toPath()) + .filter(s -> s.contains("Compute Engine will use "))) + .isEmpty(); + + RandomAccessFile randomAccessFile = null; + try { + randomAccessFile = new RandomAccessFile(sharedMemory, "rw"); + MappedByteBuffer mappedByteBuffer = initMappedByteBuffer(randomAccessFile); + + verifyAnalysesRunInParallel(mappedByteBuffer, 1); + + /* 4 <= newWorkerCount <= 7 */ + int newWorkerCount = 4 + new Random().nextInt(4); + updateWorkerCount(newWorkerCount); + + Set line = Files.lines(orchestrator.getServer().getCeLogs().toPath()) + .filter(s -> s.contains("Compute Engine will use ")) + .collect(Collectors.toSet()); + assertThat(line).hasSize(1); + assertThat(line.iterator().next()).contains(valueOf(newWorkerCount)); + + verifyAnalysesRunInParallel(mappedByteBuffer, newWorkerCount); + + int lowerWorkerCount = 3; + updateWorkerCount(lowerWorkerCount); + verifyAnalysesRunInParallel(mappedByteBuffer, lowerWorkerCount); + } finally { + close(randomAccessFile); + } + } + + private void updateWorkerCount(int newWorkerCount) { + orchestrator.getServer() + .newHttpCall("api/ce/refreshWorkerCount") + .setMethod(HttpMethod.POST) + .setParam("count", valueOf(newWorkerCount)) + .execute(); + } + + private void verifyAnalysesRunInParallel(MappedByteBuffer mappedByteBuffer, int workerCount) { + assertThat(adminWsClient.ce().workerCount()) + .extracting(WsCe.WorkerCountResponse::getValue, WsCe.WorkerCountResponse::getCanSetWorkerCount) + .containsOnly(workerCount, true); + + blockAnyAnalysisWithFakeGovernancePlugin(mappedByteBuffer); + + // start analysis of workerCount + 2 projects + List projectKeys = IntStream.range(0, workerCount + 2).mapToObj(i -> "prj" + i).collect(toList()); + for (String projectKey : projectKeys) { + SonarScanner sonarRunner = SonarScanner.create(ItUtils.projectDir("shared/xoo-sample")) + .setProperties("sonar.projectKey", projectKey); + orchestrator.executeBuild(sonarRunner, false); + } + + List tasksList = waitForWsCallStatus( + this::getTasksAllTasks, + (tasks) -> verifyInProgressTaskCount(tasks, workerCount)); + + assertThat(tasksList.stream() + .filter(ConcurrentCeWorkersTest::pending) + .map(WsCe.Task::getComponentKey) + .collect(toSet())) + .isEqualTo(copyOf(projectKeys.subList(workerCount, projectKeys.size()))); + assertThat(tasksList.stream() + .filter(ConcurrentCeWorkersTest::inProgress) + .map(WsCe.Task::getComponentKey) + .collect(toSet())) + .isEqualTo(copyOf(projectKeys.subList(0, workerCount))); + + releaseAnyAnalysisWithFakeGovernancePlugin(mappedByteBuffer); + + waitForWsCallStatus(this::getTasksAllTasks, List::isEmpty); + } + + private static MappedByteBuffer initMappedByteBuffer(RandomAccessFile randomAccessFile) throws IOException { + return randomAccessFile.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, 1); + } + + private void releaseAnyAnalysisWithFakeGovernancePlugin(MappedByteBuffer mappedByteBuffer) { + // let the blocked analyses finish running + mappedByteBuffer.put(0, UNLATCHED); + } + + private static void blockAnyAnalysisWithFakeGovernancePlugin(MappedByteBuffer mappedByteBuffer) { + // block any analysis which will run with the fake-governance-plugin + mappedByteBuffer.put(0, BLOCKING); + } + + private void close(@Nullable RandomAccessFile randomAccessFile) throws IOException { + if (randomAccessFile != null) { + randomAccessFile.close(); + } + } + + private static boolean verifyInProgressTaskCount(List tasksList, int workerCount) { + return tasksList.stream().filter(ConcurrentCeWorkersTest::inProgress).count() >= workerCount; + } + + private static boolean pending(WsCe.Task task) { + return WsCe.TaskStatus.PENDING == task.getStatus(); + } + + private static boolean inProgress(WsCe.Task task) { + return WsCe.TaskStatus.IN_PROGRESS == task.getStatus(); + } + + private List getTasksAllTasks(WsClient wsClient) { + return wsClient.ce().activity(new ActivityWsRequest() + .setStatus(ImmutableList.of(STATUS_PENDING, STATUS_IN_PROGRESS))) + .getTasksList(); + } + + private T waitForWsCallStatus(Function call, Predicate test) { + WsClient wsClient = ItUtils.newAdminWsClient(orchestrator); + int i = 0; + T returnValue = call.apply(wsClient); + boolean expectedState = test.test(returnValue); + while (i < MAX_WAIT_LOOP && !expectedState) { + waitInterruptedly(); + i++; + returnValue = call.apply(wsClient); + expectedState = test.test(returnValue); + } + assertThat(expectedState) + .as("Failed to wait for expected queue status. Last call returned:\n%s", returnValue) + .isTrue(); + return returnValue; + } + + private static void waitInterruptedly() { + try { + Thread.sleep(WAIT); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + +}