]> source.dussan.org Git - sonarqube.git/commitdiff
SONAR-12686 use Jetty for TCP ElasticSearch transport in unit tests
authorSébastien Lesaint <sebastien.lesaint@sonarsource.com>
Wed, 8 Jan 2020 09:36:49 +0000 (10:36 +0100)
committersonartech <sonartech@sonarsource.com>
Thu, 5 Nov 2020 20:06:11 +0000 (20:06 +0000)
LocalTransport is dropped in ElasticSearch 7.X

server/sonar-server-common/src/testFixtures/java/org/elasticsearch/transport/MockTcpTransport.java [deleted file]
server/sonar-server-common/src/testFixtures/java/org/sonar/server/es/EsTester.java

diff --git a/server/sonar-server-common/src/testFixtures/java/org/elasticsearch/transport/MockTcpTransport.java b/server/sonar-server-common/src/testFixtures/java/org/elasticsearch/transport/MockTcpTransport.java
deleted file mode 100644 (file)
index 61ddd5b..0000000
+++ /dev/null
@@ -1,495 +0,0 @@
-/*
- * SonarQube
- * Copyright (C) 2009-2020 SonarSource SA
- * mailto:info AT sonarsource DOT com
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
- */
-package org.elasticsearch.transport;
-
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.Closeable;
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.net.SocketException;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-import org.elasticsearch.Version;
-import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.cli.SuppressForbidden;
-import org.elasticsearch.cluster.node.DiscoveryNode;
-import org.elasticsearch.common.bytes.BytesArray;
-import org.elasticsearch.common.bytes.BytesReference;
-import org.elasticsearch.common.concurrent.CompletableContext;
-import org.elasticsearch.common.io.stream.BytesStreamOutput;
-import org.elasticsearch.common.io.stream.InputStreamStreamInput;
-import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
-import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
-import org.elasticsearch.common.io.stream.StreamInput;
-import org.elasticsearch.common.network.NetworkService;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.unit.ByteSizeValue;
-import org.elasticsearch.common.util.BigArrays;
-import org.elasticsearch.common.util.CancellableThreads;
-import org.elasticsearch.common.util.PageCacheRecycler;
-import org.elasticsearch.common.util.concurrent.AbstractRunnable;
-import org.elasticsearch.common.util.concurrent.EsExecutors;
-import org.elasticsearch.core.internal.io.IOUtils;
-import org.elasticsearch.indices.breaker.CircuitBreakerService;
-import org.elasticsearch.mocksocket.MockServerSocket;
-import org.elasticsearch.mocksocket.MockSocket;
-import org.elasticsearch.threadpool.ThreadPool;
-
-/**
- * This is a socket based blocking TcpTransport implementation that is used for tests
- * that need real networking. This implementation is a test only implementation that implements
- * the networking layer in the worst possible way since it blocks and uses a thread per request model.
- */
-public class MockTcpTransport extends TcpTransport {
-  private static final Logger logger = LogManager.getLogger(MockTcpTransport.class);
-
-  /**
-   * A pre-built light connection profile that shares a single connection across all
-   * types.
-   */
-  static final ConnectionProfile LIGHT_PROFILE;
-
-  private final Set<MockChannel> openChannels = new HashSet<>();
-
-  static {
-    ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
-    builder.addConnections(1,
-      TransportRequestOptions.Type.BULK,
-      TransportRequestOptions.Type.PING,
-      TransportRequestOptions.Type.RECOVERY,
-      TransportRequestOptions.Type.REG,
-      TransportRequestOptions.Type.STATE);
-    LIGHT_PROFILE = builder.build();
-  }
-
-  private final ExecutorService executor;
-
-  public MockTcpTransport(Settings settings, ThreadPool threadPool, BigArrays bigArrays,
-                          CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry,
-                          NetworkService networkService) {
-    this(settings, threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, networkService, Version.CURRENT);
-  }
-
-  public MockTcpTransport(Settings settings, ThreadPool threadPool, BigArrays bigArrays,
-                          CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry,
-                          NetworkService networkService, Version mockVersion) {
-    super("mock-tcp-transport", settings, mockVersion, threadPool, PageCacheRecycler.NON_RECYCLING_INSTANCE, circuitBreakerService,
-      namedWriteableRegistry, networkService);
-    // we have our own crazy cached threadpool this one is not bounded at all...
-    // using the ES thread factory here is crucial for tests otherwise disruption tests won't block that thread
-    executor = Executors.newCachedThreadPool(EsExecutors.daemonThreadFactory(settings, Transports.TEST_MOCK_TRANSPORT_THREAD_PREFIX));
-  }
-
-  @Override
-  protected MockChannel bind(final String name, InetSocketAddress address) throws IOException {
-    MockServerSocket socket = new MockServerSocket();
-    socket.setReuseAddress(TransportSettings.TCP_REUSE_ADDRESS.get(settings));
-    ByteSizeValue tcpReceiveBufferSize = TransportSettings.TCP_RECEIVE_BUFFER_SIZE.get(settings);
-    if (tcpReceiveBufferSize.getBytes() > 0) {
-      socket.setReceiveBufferSize(tcpReceiveBufferSize.bytesAsInt());
-    }
-    socket.bind(address);
-    MockChannel serverMockChannel = new MockChannel(socket, name);
-    CountDownLatch started = new CountDownLatch(1);
-    executor.execute(new AbstractRunnable() {
-      @Override
-      public void onFailure(Exception e) {
-        onException(serverMockChannel, e);
-      }
-
-      @Override
-      protected void doRun() throws Exception {
-        started.countDown();
-        serverMockChannel.accept(executor);
-      }
-    });
-    try {
-      started.await();
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-    }
-    return serverMockChannel;
-  }
-
-  private void readMessage(MockChannel mockChannel, StreamInput input) throws IOException {
-    Socket socket = mockChannel.activeChannel;
-    byte[] minimalHeader = new byte[TcpHeader.MARKER_BYTES_SIZE + TcpHeader.MESSAGE_LENGTH_SIZE];
-    try {
-      input.readFully(minimalHeader);
-    } catch (EOFException eof) {
-      throw new IOException("Connection reset by peer");
-    }
-
-    // Read message length will throw stream corrupted exception if the marker bytes incorrect
-    int msgSize = TcpTransport.readMessageLength(new BytesArray(minimalHeader));
-    if (msgSize == -1) {
-      socket.getOutputStream().flush();
-    } else {
-      final byte[] buffer = new byte[msgSize];
-      input.readFully(buffer);
-      int expectedSize = TcpHeader.MARKER_BYTES_SIZE + TcpHeader.MESSAGE_LENGTH_SIZE + msgSize;
-      try (BytesStreamOutput output = new ReleasableBytesStreamOutput(expectedSize, bigArrays)) {
-        output.write(minimalHeader);
-        output.write(buffer);
-        consumeNetworkReads(mockChannel, output.bytes());
-      }
-    }
-  }
-
-  @Override
-  @SuppressForbidden(reason = "real socket for mocking remote connections")
-  protected MockChannel initiateChannel(DiscoveryNode node) throws IOException {
-    InetSocketAddress address = node.getAddress().address();
-    final MockSocket socket = new MockSocket();
-    final MockChannel channel = new MockChannel(socket, address, false, "none");
-
-    boolean success = false;
-    try {
-      configureSocket(socket);
-      success = true;
-    } finally {
-      if (!success) {
-        IOUtils.close(socket);
-      }
-    }
-
-    executor.submit(() -> {
-      try {
-        socket.connect(address);
-        socket.setSoLinger(false, 0);
-        channel.connectFuture.complete(null);
-        channel.loopRead(executor);
-      } catch (Exception ex) {
-        channel.connectFuture.completeExceptionally(ex);
-      }
-    });
-
-    return channel;
-  }
-
-  @Override
-  protected ConnectionProfile maybeOverrideConnectionProfile(ConnectionProfile connectionProfile) {
-    ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
-    Set<TransportRequestOptions.Type> allTypesWithConnection = new HashSet<>();
-    Set<TransportRequestOptions.Type> allTypesWithoutConnection = new HashSet<>();
-    for (ConnectionProfile.ConnectionTypeHandle handle : connectionProfile.getHandles()) {
-      Set<TransportRequestOptions.Type> types = handle.getTypes();
-      if (handle.length > 0) {
-        allTypesWithConnection.addAll(types);
-      } else {
-        allTypesWithoutConnection.addAll(types);
-      }
-    }
-    // make sure we maintain at least the types that are supported by this profile even if we only use a single channel for them.
-    builder.addConnections(1, allTypesWithConnection.toArray(new TransportRequestOptions.Type[0]));
-    if (!allTypesWithoutConnection.isEmpty()) {
-      builder.addConnections(0, allTypesWithoutConnection.toArray(new TransportRequestOptions.Type[0]));
-    }
-    builder.setHandshakeTimeout(connectionProfile.getHandshakeTimeout());
-    builder.setConnectTimeout(connectionProfile.getConnectTimeout());
-    builder.setPingInterval(connectionProfile.getPingInterval());
-    builder.setCompressionEnabled(connectionProfile.getCompressionEnabled());
-    return builder.build();
-  }
-
-  private void configureSocket(Socket socket) throws SocketException {
-    socket.setTcpNoDelay(TransportSettings.TCP_NO_DELAY.get(settings));
-    ByteSizeValue tcpSendBufferSize = TransportSettings.TCP_SEND_BUFFER_SIZE.get(settings);
-    if (tcpSendBufferSize.getBytes() > 0) {
-      socket.setSendBufferSize(tcpSendBufferSize.bytesAsInt());
-    }
-    ByteSizeValue tcpReceiveBufferSize = TransportSettings.TCP_RECEIVE_BUFFER_SIZE.get(settings);
-    if (tcpReceiveBufferSize.getBytes() > 0) {
-      socket.setReceiveBufferSize(tcpReceiveBufferSize.bytesAsInt());
-    }
-    socket.setReuseAddress(TransportSettings.TCP_REUSE_ADDRESS.get(settings));
-  }
-
-  public final class MockChannel implements Closeable, TcpChannel, TcpServerChannel {
-    private final AtomicBoolean isOpen = new AtomicBoolean(true);
-    private final InetSocketAddress localAddress;
-    private final ServerSocket serverSocket;
-    private final Set<MockChannel> workerChannels = Collections.newSetFromMap(new ConcurrentHashMap<>());
-    private final Socket activeChannel;
-    private final boolean isServer;
-    private final String profile;
-    private final CancellableThreads cancellableThreads = new CancellableThreads();
-    private final CompletableContext<Void> closeFuture = new CompletableContext<>();
-    private final CompletableContext<Void> connectFuture = new CompletableContext<>();
-    private final ChannelStats stats = new ChannelStats();
-
-    /**
-     * Constructs a new MockChannel instance intended for handling the actual incoming / outgoing traffic.
-     *
-     * @param socket The client socket. Mut not be null.
-     * @param localAddress Address associated with the corresponding local server socket. Must not be null.
-     * @param profile The associated profile name.
-     */
-    MockChannel(Socket socket, InetSocketAddress localAddress, boolean isServer, String profile) {
-      this.localAddress = localAddress;
-      this.activeChannel = socket;
-      this.isServer = isServer;
-      this.serverSocket = null;
-      this.profile = profile;
-      synchronized (openChannels) {
-        openChannels.add(this);
-      }
-    }
-
-    /**
-     * Constructs a new MockChannel instance intended for accepting requests.
-     *
-     * @param serverSocket The associated server socket. Must not be null.
-     * @param profile The associated profile name.
-     */
-    MockChannel(ServerSocket serverSocket, String profile) {
-      this.localAddress = (InetSocketAddress) serverSocket.getLocalSocketAddress();
-      this.serverSocket = serverSocket;
-      this.profile = profile;
-      this.isServer = false;
-      this.activeChannel = null;
-      synchronized (openChannels) {
-        openChannels.add(this);
-      }
-    }
-
-    public void accept(Executor executor) throws IOException {
-      while (isOpen.get()) {
-        Socket incomingSocket = serverSocket.accept();
-        MockChannel incomingChannel = null;
-        try {
-          configureSocket(incomingSocket);
-          synchronized (this) {
-            if (isOpen.get()) {
-              InetSocketAddress localAddress = new InetSocketAddress(incomingSocket.getLocalAddress(),
-                incomingSocket.getPort());
-              incomingChannel = new MockChannel(incomingSocket, localAddress, true, profile);
-              MockChannel finalIncomingChannel = incomingChannel;
-              incomingChannel.addCloseListener(new ActionListener<Void>() {
-                @Override
-                public void onResponse(Void aVoid) {
-                  workerChannels.remove(finalIncomingChannel);
-                }
-
-                @Override
-                public void onFailure(Exception e) {
-                  workerChannels.remove(finalIncomingChannel);
-                }
-              });
-              serverAcceptedChannel(incomingChannel);
-              //establish a happens-before edge between closing and accepting a new connection
-              workerChannels.add(incomingChannel);
-
-              // this spawns a new thread immediately, so OK under lock
-              incomingChannel.loopRead(executor);
-              // the channel is properly registered and will be cleared by the close code.
-              incomingSocket = null;
-              incomingChannel = null;
-            }
-          }
-        } finally {
-          // ensure we don't leak sockets and channels in the failure case. Note that we null both
-          // if there are no exceptions so this becomes a no op.
-          IOUtils.closeWhileHandlingException(incomingSocket, incomingChannel);
-        }
-      }
-    }
-
-    void loopRead(Executor executor) {
-      executor.execute(new AbstractRunnable() {
-        @Override
-        public void onFailure(Exception e) {
-          if (isOpen.get()) {
-            try {
-              onException(MockChannel.this, e);
-            } catch (Exception ex) {
-              logger.warn("failed on handling exception", ex);
-              IOUtils.closeWhileHandlingException(MockChannel.this); // pure paranoia
-            }
-          }
-        }
-
-        @Override
-        protected void doRun() throws Exception {
-          StreamInput input = new InputStreamStreamInput(new BufferedInputStream(activeChannel.getInputStream()));
-          // There is a (slim) chance that we get interrupted right after a loop iteration, so check explicitly
-          while (isOpen.get() && !Thread.currentThread().isInterrupted()) {
-            cancellableThreads.executeIO(() -> readMessage(MockChannel.this, input));
-          }
-        }
-      });
-    }
-
-    synchronized void close0() throws IOException {
-      // establish a happens-before edge between closing and accepting a new connection
-      // we have to sync this entire block to ensure that our openChannels checks work correctly.
-      // The close block below will close all worker channels but if one of the worker channels runs into an exception
-      // for instance due to a disconnect the handling of this exception might be executed concurrently.
-      // now if we are in-turn concurrently call close we might not wait for the actual close to happen and that will, down the road
-      // make the assertion trip that not all channels are closed.
-      if (isOpen.compareAndSet(true, false)) {
-        final boolean removedChannel;
-        synchronized (openChannels) {
-          removedChannel = openChannels.remove(this);
-        }
-        IOUtils.close(serverSocket, activeChannel, () -> IOUtils.close(workerChannels),
-          () -> cancellableThreads.cancel("channel closed"));
-        assert removedChannel: "Channel was not removed or removed twice?";
-      }
-    }
-
-    @Override
-    public String toString() {
-      return "MockChannel{" +
-        "profile='" + profile + '\'' +
-        ", isOpen=" + isOpen +
-        ", localAddress=" + localAddress +
-        ", isServerSocket=" + (serverSocket != null) +
-        '}';
-    }
-
-    @Override
-    public void close() {
-      try {
-        close0();
-        closeFuture.complete(null);
-      } catch (IOException e) {
-        closeFuture.completeExceptionally(e);
-      }
-    }
-
-    @Override
-    public String getProfile() {
-      return profile;
-    }
-
-    @Override
-    public boolean isServerChannel() {
-      return isServer;
-    }
-
-    @Override
-    public void addCloseListener(ActionListener<Void> listener) {
-      closeFuture.addListener(ActionListener.toBiConsumer(listener));
-    }
-
-    @Override
-    public void addConnectListener(ActionListener<Void> listener) {
-      connectFuture.addListener(ActionListener.toBiConsumer(listener));
-    }
-
-    @Override
-    public ChannelStats getChannelStats() {
-      return stats;
-    }
-
-    @Override
-    public boolean isOpen() {
-      return isOpen.get();
-    }
-
-    @Override
-    public InetSocketAddress getLocalAddress() {
-      return localAddress;
-    }
-
-    @Override
-    public InetSocketAddress getRemoteAddress() {
-      return (InetSocketAddress) activeChannel.getRemoteSocketAddress();
-    }
-
-    @Override
-    public void sendMessage(BytesReference reference, ActionListener<Void> listener) {
-      try {
-        synchronized (this) {
-          OutputStream outputStream = new BufferedOutputStream(activeChannel.getOutputStream());
-          reference.writeTo(outputStream);
-          outputStream.flush();
-        }
-        listener.onResponse(null);
-      } catch (IOException e) {
-        listener.onFailure(e);
-        onException(this, e);
-      }
-    }
-  }
-
-
-  @Override
-  protected void doStart() {
-    boolean success = false;
-    try {
-      if (NetworkService.NETWORK_SERVER.get(settings)) {
-        // loop through all profiles and start them up, special handling for default one
-        for (ProfileSettings profileSettings : profileSettings) {
-          bindServer(profileSettings);
-        }
-      }
-      super.doStart();
-      success = true;
-    } finally {
-      if (!success) {
-        doStop();
-      }
-    }
-  }
-
-  @Override
-  protected void stopInternal() {
-    ThreadPool.terminate(executor, 10, TimeUnit.SECONDS);
-    synchronized (openChannels) {
-      assert openChannels.isEmpty() : "there are still open channels: " + openChannels;
-    }
-  }
-}
index 9fdb1cb3f811023aaa213201f5a62d28e8573e8e..864083777617cf950adad085bd2c413da3e5f923 100644 (file)
@@ -24,16 +24,19 @@ import com.google.common.collect.Collections2;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
+import java.io.IOException;
+import java.net.DatagramSocket;
+import java.net.ServerSocket;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.commons.lang.reflect.ConstructorUtils;
@@ -48,31 +51,27 @@ import org.elasticsearch.analysis.common.CommonAnalysisPlugin;
 import org.elasticsearch.cluster.health.ClusterHealthStatus;
 import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings;
 import org.elasticsearch.common.Strings;
-import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.network.NetworkModule;
-import org.elasticsearch.common.network.NetworkService;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.common.util.BigArrays;
-import org.elasticsearch.common.util.PageCacheRecycler;
 import org.elasticsearch.discovery.DiscoveryModule;
 import org.elasticsearch.env.Environment;
 import org.elasticsearch.env.NodeEnvironment;
+import org.elasticsearch.http.BindHttpException;
+import org.elasticsearch.http.HttpTransportSettings;
 import org.elasticsearch.index.IndexNotFoundException;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.TermQueryBuilder;
-import org.elasticsearch.indices.breaker.CircuitBreakerService;
 import org.elasticsearch.indices.recovery.RecoverySettings;
 import org.elasticsearch.join.ParentJoinPlugin;
 import org.elasticsearch.node.InternalSettingsPreparer;
 import org.elasticsearch.node.Node;
-import org.elasticsearch.plugins.NetworkPlugin;
-import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.node.NodeValidationException;
 import org.elasticsearch.search.SearchHit;
-import org.elasticsearch.threadpool.ThreadPool;
-import org.elasticsearch.transport.MockTcpTransport;
-import org.elasticsearch.transport.Transport;
+import org.elasticsearch.transport.Netty4Plugin;
 import org.junit.rules.ExternalResource;
+import org.sonar.api.utils.log.Logger;
+import org.sonar.api.utils.log.Loggers;
 import org.sonar.server.component.index.ComponentIndexDefinition;
 import org.sonar.server.es.IndexDefinition.IndexDefinitionContext;
 import org.sonar.server.es.IndexType.IndexRelationType;
@@ -84,8 +83,10 @@ import org.sonar.server.rule.index.RuleIndexDefinition;
 import org.sonar.server.user.index.UserIndexDefinition;
 import org.sonar.server.view.index.ViewIndexDefinition;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 import static com.google.common.collect.Lists.newArrayList;
+import static java.lang.String.format;
 import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
 import static org.sonar.server.es.Index.ALL_INDICES;
 import static org.sonar.server.es.IndexType.FIELD_INDEX_TYPE;
@@ -93,6 +94,11 @@ import static org.sonar.server.es.newindex.DefaultIndexSettings.REFRESH_IMMEDIAT
 
 public class EsTester extends ExternalResource {
 
+  private static final int MIN_PORT = 1;
+  private static final int MAX_PORT = 49151;
+  private static final int MIN_NON_ROOT_PORT = 1025;
+  private static final Logger LOG = Loggers.get(EsTester.class);
+
   static {
     System.setProperty("log4j.shutdownHookEnabled", "false");
     // we can not shutdown logging when tests are running or the next test that runs within the
@@ -356,49 +362,89 @@ public class EsTester extends ExternalResource {
     try {
       Path tempDir = Files.createTempDirectory("EsTester");
       tempDir.toFile().deleteOnExit();
-      Settings settings = Settings.builder()
-        .put(Environment.PATH_HOME_SETTING.getKey(), tempDir)
-        .put("node.name", "EsTester")
-        .put(NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.getKey(), Integer.MAX_VALUE)
-        .put("logger.level", "INFO")
-        .put("action.auto_create_index", false)
-        // Default the watermarks to absurdly low to prevent the tests
-        // from failing on nodes without enough disk space
-        .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "1b")
-        .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "1b")
-        .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(), "1b")
-        // always reduce this - it can make tests really slow
-        .put(RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING.getKey(), TimeValue.timeValueMillis(20))
-        .put(NetworkModule.TRANSPORT_TYPE_KEY, "local")
-        .put(NetworkModule.HTTP_ENABLED.getKey(), false)
-        .put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "single-node")
-        .build();
-      Node node = new Node(InternalSettingsPreparer.prepareEnvironment(settings, null),
-        ImmutableList.of(
-          CommonAnalysisPlugin.class,
-          // mock local transport plugin
-          MockTcpTransportPlugin.class,
-          // install ParentJoin plugin required to create field of type "join"
-          ParentJoinPlugin.class),
-        true) {
-        @Override
-        protected void registerDerivedNodeNameWithLogger(String nodeName) {
-          // nothing to do
+      int i = 10;
+      while (i > 0) {
+        int httpPort = getNextAvailable();
+        try {
+          Node node = startNode(tempDir, httpPort);
+          LOG.info("EsTester running ElasticSearch on HTTP port {}", httpPort);
+          return node;
+        } catch (BindHttpException e) {
+          i--;
         }
-      };
-      return node.start();
+      }
     } catch (Exception e) {
       throw new IllegalStateException("Fail to start embedded Elasticsearch", e);
     }
+    throw new IllegalStateException("Failed to find an open port to connect EsTester's Elasticsearch instance after 10 attempts");
+  }
+
+  private static Node startNode(Path tempDir, int httpPort) throws NodeValidationException {
+    Settings settings = Settings.builder()
+      .put(Environment.PATH_HOME_SETTING.getKey(), tempDir)
+      .put("node.name", "EsTester")
+      .put(NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.getKey(), Integer.MAX_VALUE)
+      .put("logger.level", "INFO")
+      .put("action.auto_create_index", false)
+      // Default the watermarks to absurdly low to prevent the tests
+      // from failing on nodes without enough disk space
+      .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "1b")
+      .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "1b")
+      .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(), "1b")
+      // always reduce this - it can make tests really slow
+      .put(RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING.getKey(), TimeValue.timeValueMillis(20))
+      .put(NetworkModule.HTTP_ENABLED.getKey(), true)
+      .put(HttpTransportSettings.SETTING_HTTP_PORT.getKey(), httpPort)
+      .put(HttpTransportSettings.SETTING_HTTP_BIND_HOST.getKey(), "localhost")
+      .put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "single-node")
+      .build();
+    Node node = new Node(InternalSettingsPreparer.prepareEnvironment(settings, null),
+      ImmutableList.of(
+        CommonAnalysisPlugin.class,
+        // Netty4Plugin provides http and tcp transport
+        Netty4Plugin.class,
+        // install ParentJoin plugin required to create field of type "join"
+        ParentJoinPlugin.class),
+      true) {
+      @Override
+      protected void registerDerivedNodeNameWithLogger(String nodeName) {
+        // nothing to do
+      }
+    };
+    return node.start();
   }
 
-  public static final class MockTcpTransportPlugin extends Plugin implements NetworkPlugin {
-    @Override
-    public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool, PageCacheRecycler pageCacheRecycler,
-      CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) {
-      return Collections.singletonMap(
-        "local",
-        () -> new MockTcpTransport(settings, threadPool, BigArrays.NON_RECYCLING_INSTANCE, circuitBreakerService, namedWriteableRegistry, networkService));
+  public static int getNextAvailable() {
+    Random random = new Random();
+    int maxAttempts = 10;
+    int i = maxAttempts;
+    while (i > 0) {
+      int port = MIN_NON_ROOT_PORT + random.nextInt(MAX_PORT - MIN_NON_ROOT_PORT);
+      if (available(port)) {
+        return port;
+      }
+      i--;
     }
+
+    throw new NoSuchElementException(format("Could not find an available port in %s attempts", maxAttempts));
   }
+
+  private static boolean available(int port) {
+    checkArgument(validPort(port), "Invalid port: %s", port);
+
+    try (ServerSocket ss = new ServerSocket(port)) {
+      ss.setReuseAddress(true);
+      try (DatagramSocket ds = new DatagramSocket(port)) {
+        ds.setReuseAddress(true);
+      }
+      return true;
+    } catch (IOException var13) {
+      return false;
+    }
+  }
+
+  private static boolean validPort(int fromPort) {
+    return fromPort >= MIN_PORT && fromPort <= MAX_PORT;
+  }
+
 }