path: root/server
diff options
authorSébastien Lesaint <sebastien.lesaint@sonarsource.com>2020-01-08 10:36:49 +0100
committersonartech <sonartech@sonarsource.com>2020-11-05 20:06:11 +0000
commit13825db7112ffc75e5c6d0355a22ddb5eff9c432 (patch)
treea0bb5ac79a9da36af4423b177b391abd0f28fb5d /server
parent74aafb696514932e809fc95d97f4ae1438439900 (diff)
SONAR-12686 use Jetty for TCP ElasticSearch transport in unit tests
LocalTransport is dropped in ElasticSearch 7.X
Diffstat (limited to 'server')
2 files changed, 95 insertions, 544 deletions
diff --git a/server/sonar-server-common/src/testFixtures/java/org/elasticsearch/transport/MockTcpTransport.java b/server/sonar-server-common/src/testFixtures/java/org/elasticsearch/transport/MockTcpTransport.java
deleted file mode 100644
index 61ddd5b2a9d..00000000000
--- a/server/sonar-server-common/src/testFixtures/java/org/elasticsearch/transport/MockTcpTransport.java
+++ /dev/null
@@ -1,495 +0,0 @@
- * SonarQube
- * Copyright (C) 2009-2020 SonarSource SA
- * mailto:info AT sonarsource DOT com
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-package org.elasticsearch.transport;
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.Closeable;
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.net.SocketException;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-import org.elasticsearch.Version;
-import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.cli.SuppressForbidden;
-import org.elasticsearch.cluster.node.DiscoveryNode;
-import org.elasticsearch.common.bytes.BytesArray;
-import org.elasticsearch.common.bytes.BytesReference;
-import org.elasticsearch.common.concurrent.CompletableContext;
-import org.elasticsearch.common.io.stream.BytesStreamOutput;
-import org.elasticsearch.common.io.stream.InputStreamStreamInput;
-import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
-import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
-import org.elasticsearch.common.io.stream.StreamInput;
-import org.elasticsearch.common.network.NetworkService;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.unit.ByteSizeValue;
-import org.elasticsearch.common.util.BigArrays;
-import org.elasticsearch.common.util.CancellableThreads;
-import org.elasticsearch.common.util.PageCacheRecycler;
-import org.elasticsearch.common.util.concurrent.AbstractRunnable;
-import org.elasticsearch.common.util.concurrent.EsExecutors;
-import org.elasticsearch.core.internal.io.IOUtils;
-import org.elasticsearch.indices.breaker.CircuitBreakerService;
-import org.elasticsearch.mocksocket.MockServerSocket;
-import org.elasticsearch.mocksocket.MockSocket;
-import org.elasticsearch.threadpool.ThreadPool;
- * This is a socket based blocking TcpTransport implementation that is used for tests
- * that need real networking. This implementation is a test only implementation that implements
- * the networking layer in the worst possible way since it blocks and uses a thread per request model.
- */
-public class MockTcpTransport extends TcpTransport {
- private static final Logger logger = LogManager.getLogger(MockTcpTransport.class);
- /**
- * A pre-built light connection profile that shares a single connection across all
- * types.
- */
- static final ConnectionProfile LIGHT_PROFILE;
- private final Set<MockChannel> openChannels = new HashSet<>();
- static {
- ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
- builder.addConnections(1,
- TransportRequestOptions.Type.BULK,
- TransportRequestOptions.Type.PING,
- TransportRequestOptions.Type.RECOVERY,
- TransportRequestOptions.Type.REG,
- TransportRequestOptions.Type.STATE);
- LIGHT_PROFILE = builder.build();
- }
- private final ExecutorService executor;
- public MockTcpTransport(Settings settings, ThreadPool threadPool, BigArrays bigArrays,
- CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry,
- NetworkService networkService) {
- this(settings, threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, networkService, Version.CURRENT);
- }
- public MockTcpTransport(Settings settings, ThreadPool threadPool, BigArrays bigArrays,
- CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry,
- NetworkService networkService, Version mockVersion) {
- super("mock-tcp-transport", settings, mockVersion, threadPool, PageCacheRecycler.NON_RECYCLING_INSTANCE, circuitBreakerService,
- namedWriteableRegistry, networkService);
- // we have our own crazy cached threadpool this one is not bounded at all...
- // using the ES thread factory here is crucial for tests otherwise disruption tests won't block that thread
- executor = Executors.newCachedThreadPool(EsExecutors.daemonThreadFactory(settings, Transports.TEST_MOCK_TRANSPORT_THREAD_PREFIX));
- }
- @Override
- protected MockChannel bind(final String name, InetSocketAddress address) throws IOException {
- MockServerSocket socket = new MockServerSocket();
- socket.setReuseAddress(TransportSettings.TCP_REUSE_ADDRESS.get(settings));
- ByteSizeValue tcpReceiveBufferSize = TransportSettings.TCP_RECEIVE_BUFFER_SIZE.get(settings);
- if (tcpReceiveBufferSize.getBytes() > 0) {
- socket.setReceiveBufferSize(tcpReceiveBufferSize.bytesAsInt());
- }
- socket.bind(address);
- MockChannel serverMockChannel = new MockChannel(socket, name);
- CountDownLatch started = new CountDownLatch(1);
- executor.execute(new AbstractRunnable() {
- @Override
- public void onFailure(Exception e) {
- onException(serverMockChannel, e);
- }
- @Override
- protected void doRun() throws Exception {
- started.countDown();
- serverMockChannel.accept(executor);
- }
- });
- try {
- started.await();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- return serverMockChannel;
- }
- private void readMessage(MockChannel mockChannel, StreamInput input) throws IOException {
- Socket socket = mockChannel.activeChannel;
- byte[] minimalHeader = new byte[TcpHeader.MARKER_BYTES_SIZE + TcpHeader.MESSAGE_LENGTH_SIZE];
- try {
- input.readFully(minimalHeader);
- } catch (EOFException eof) {
- throw new IOException("Connection reset by peer");
- }
- // Read message length will throw stream corrupted exception if the marker bytes incorrect
- int msgSize = TcpTransport.readMessageLength(new BytesArray(minimalHeader));
- if (msgSize == -1) {
- socket.getOutputStream().flush();
- } else {
- final byte[] buffer = new byte[msgSize];
- input.readFully(buffer);
- int expectedSize = TcpHeader.MARKER_BYTES_SIZE + TcpHeader.MESSAGE_LENGTH_SIZE + msgSize;
- try (BytesStreamOutput output = new ReleasableBytesStreamOutput(expectedSize, bigArrays)) {
- output.write(minimalHeader);
- output.write(buffer);
- consumeNetworkReads(mockChannel, output.bytes());
- }
- }
- }
- @Override
- @SuppressForbidden(reason = "real socket for mocking remote connections")
- protected MockChannel initiateChannel(DiscoveryNode node) throws IOException {
- InetSocketAddress address = node.getAddress().address();
- final MockSocket socket = new MockSocket();
- final MockChannel channel = new MockChannel(socket, address, false, "none");
- boolean success = false;
- try {
- configureSocket(socket);
- success = true;
- } finally {
- if (!success) {
- IOUtils.close(socket);
- }
- }
- executor.submit(() -> {
- try {
- socket.connect(address);
- socket.setSoLinger(false, 0);
- channel.connectFuture.complete(null);
- channel.loopRead(executor);
- } catch (Exception ex) {
- channel.connectFuture.completeExceptionally(ex);
- }
- });
- return channel;
- }
- @Override
- protected ConnectionProfile maybeOverrideConnectionProfile(ConnectionProfile connectionProfile) {
- ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
- Set<TransportRequestOptions.Type> allTypesWithConnection = new HashSet<>();
- Set<TransportRequestOptions.Type> allTypesWithoutConnection = new HashSet<>();
- for (ConnectionProfile.ConnectionTypeHandle handle : connectionProfile.getHandles()) {
- Set<TransportRequestOptions.Type> types = handle.getTypes();
- if (handle.length > 0) {
- allTypesWithConnection.addAll(types);
- } else {
- allTypesWithoutConnection.addAll(types);
- }
- }
- // make sure we maintain at least the types that are supported by this profile even if we only use a single channel for them.
- builder.addConnections(1, allTypesWithConnection.toArray(new TransportRequestOptions.Type[0]));
- if (!allTypesWithoutConnection.isEmpty()) {
- builder.addConnections(0, allTypesWithoutConnection.toArray(new TransportRequestOptions.Type[0]));
- }
- builder.setHandshakeTimeout(connectionProfile.getHandshakeTimeout());
- builder.setConnectTimeout(connectionProfile.getConnectTimeout());
- builder.setPingInterval(connectionProfile.getPingInterval());
- builder.setCompressionEnabled(connectionProfile.getCompressionEnabled());
- return builder.build();
- }
- private void configureSocket(Socket socket) throws SocketException {
- socket.setTcpNoDelay(TransportSettings.TCP_NO_DELAY.get(settings));
- ByteSizeValue tcpSendBufferSize = TransportSettings.TCP_SEND_BUFFER_SIZE.get(settings);
- if (tcpSendBufferSize.getBytes() > 0) {
- socket.setSendBufferSize(tcpSendBufferSize.bytesAsInt());
- }
- ByteSizeValue tcpReceiveBufferSize = TransportSettings.TCP_RECEIVE_BUFFER_SIZE.get(settings);
- if (tcpReceiveBufferSize.getBytes() > 0) {
- socket.setReceiveBufferSize(tcpReceiveBufferSize.bytesAsInt());
- }
- socket.setReuseAddress(TransportSettings.TCP_REUSE_ADDRESS.get(settings));
- }
- public final class MockChannel implements Closeable, TcpChannel, TcpServerChannel {
- private final AtomicBoolean isOpen = new AtomicBoolean(true);
- private final InetSocketAddress localAddress;
- private final ServerSocket serverSocket;
- private final Set<MockChannel> workerChannels = Collections.newSetFromMap(new ConcurrentHashMap<>());
- private final Socket activeChannel;
- private final boolean isServer;
- private final String profile;
- private final CancellableThreads cancellableThreads = new CancellableThreads();
- private final CompletableContext<Void> closeFuture = new CompletableContext<>();
- private final CompletableContext<Void> connectFuture = new CompletableContext<>();
- private final ChannelStats stats = new ChannelStats();
- /**
- * Constructs a new MockChannel instance intended for handling the actual incoming / outgoing traffic.
- *
- * @param socket The client socket. Mut not be null.
- * @param localAddress Address associated with the corresponding local server socket. Must not be null.
- * @param profile The associated profile name.
- */
- MockChannel(Socket socket, InetSocketAddress localAddress, boolean isServer, String profile) {
- this.localAddress = localAddress;
- this.activeChannel = socket;
- this.isServer = isServer;
- this.serverSocket = null;
- this.profile = profile;
- synchronized (openChannels) {
- openChannels.add(this);
- }
- }
- /**
- * Constructs a new MockChannel instance intended for accepting requests.
- *
- * @param serverSocket The associated server socket. Must not be null.
- * @param profile The associated profile name.
- */
- MockChannel(ServerSocket serverSocket, String profile) {
- this.localAddress = (InetSocketAddress) serverSocket.getLocalSocketAddress();
- this.serverSocket = serverSocket;
- this.profile = profile;
- this.isServer = false;
- this.activeChannel = null;
- synchronized (openChannels) {
- openChannels.add(this);
- }
- }
- public void accept(Executor executor) throws IOException {
- while (isOpen.get()) {
- Socket incomingSocket = serverSocket.accept();
- MockChannel incomingChannel = null;
- try {
- configureSocket(incomingSocket);
- synchronized (this) {
- if (isOpen.get()) {
- InetSocketAddress localAddress = new InetSocketAddress(incomingSocket.getLocalAddress(),
- incomingSocket.getPort());
- incomingChannel = new MockChannel(incomingSocket, localAddress, true, profile);
- MockChannel finalIncomingChannel = incomingChannel;
- incomingChannel.addCloseListener(new ActionListener<Void>() {
- @Override
- public void onResponse(Void aVoid) {
- workerChannels.remove(finalIncomingChannel);
- }
- @Override
- public void onFailure(Exception e) {
- workerChannels.remove(finalIncomingChannel);
- }
- });
- serverAcceptedChannel(incomingChannel);
- //establish a happens-before edge between closing and accepting a new connection
- workerChannels.add(incomingChannel);
- // this spawns a new thread immediately, so OK under lock
- incomingChannel.loopRead(executor);
- // the channel is properly registered and will be cleared by the close code.
- incomingSocket = null;
- incomingChannel = null;
- }
- }
- } finally {
- // ensure we don't leak sockets and channels in the failure case. Note that we null both
- // if there are no exceptions so this becomes a no op.
- IOUtils.closeWhileHandlingException(incomingSocket, incomingChannel);
- }
- }
- }
- void loopRead(Executor executor) {
- executor.execute(new AbstractRunnable() {
- @Override
- public void onFailure(Exception e) {
- if (isOpen.get()) {
- try {
- onException(MockChannel.this, e);
- } catch (Exception ex) {
- logger.warn("failed on handling exception", ex);
- IOUtils.closeWhileHandlingException(MockChannel.this); // pure paranoia
- }
- }
- }
- @Override
- protected void doRun() throws Exception {
- StreamInput input = new InputStreamStreamInput(new BufferedInputStream(activeChannel.getInputStream()));
- // There is a (slim) chance that we get interrupted right after a loop iteration, so check explicitly
- while (isOpen.get() && !Thread.currentThread().isInterrupted()) {
- cancellableThreads.executeIO(() -> readMessage(MockChannel.this, input));
- }
- }
- });
- }
- synchronized void close0() throws IOException {
- // establish a happens-before edge between closing and accepting a new connection
- // we have to sync this entire block to ensure that our openChannels checks work correctly.
- // The close block below will close all worker channels but if one of the worker channels runs into an exception
- // for instance due to a disconnect the handling of this exception might be executed concurrently.
- // now if we are in-turn concurrently call close we might not wait for the actual close to happen and that will, down the road
- // make the assertion trip that not all channels are closed.
- if (isOpen.compareAndSet(true, false)) {
- final boolean removedChannel;
- synchronized (openChannels) {
- removedChannel = openChannels.remove(this);
- }
- IOUtils.close(serverSocket, activeChannel, () -> IOUtils.close(workerChannels),
- () -> cancellableThreads.cancel("channel closed"));
- assert removedChannel: "Channel was not removed or removed twice?";
- }
- }
- @Override
- public String toString() {
- return "MockChannel{" +
- "profile='" + profile + '\'' +
- ", isOpen=" + isOpen +
- ", localAddress=" + localAddress +
- ", isServerSocket=" + (serverSocket != null) +
- '}';
- }
- @Override
- public void close() {
- try {
- close0();
- closeFuture.complete(null);
- } catch (IOException e) {
- closeFuture.completeExceptionally(e);
- }
- }
- @Override
- public String getProfile() {
- return profile;
- }
- @Override
- public boolean isServerChannel() {
- return isServer;
- }
- @Override
- public void addCloseListener(ActionListener<Void> listener) {
- closeFuture.addListener(ActionListener.toBiConsumer(listener));
- }
- @Override
- public void addConnectListener(ActionListener<Void> listener) {
- connectFuture.addListener(ActionListener.toBiConsumer(listener));
- }
- @Override
- public ChannelStats getChannelStats() {
- return stats;
- }
- @Override
- public boolean isOpen() {
- return isOpen.get();
- }
- @Override
- public InetSocketAddress getLocalAddress() {
- return localAddress;
- }
- @Override
- public InetSocketAddress getRemoteAddress() {
- return (InetSocketAddress) activeChannel.getRemoteSocketAddress();
- }
- @Override
- public void sendMessage(BytesReference reference, ActionListener<Void> listener) {
- try {
- synchronized (this) {
- OutputStream outputStream = new BufferedOutputStream(activeChannel.getOutputStream());
- reference.writeTo(outputStream);
- outputStream.flush();
- }
- listener.onResponse(null);
- } catch (IOException e) {
- listener.onFailure(e);
- onException(this, e);
- }
- }
- }
- @Override
- protected void doStart() {
- boolean success = false;
- try {
- if (NetworkService.NETWORK_SERVER.get(settings)) {
- // loop through all profiles and start them up, special handling for default one
- for (ProfileSettings profileSettings : profileSettings) {
- bindServer(profileSettings);
- }
- }
- super.doStart();
- success = true;
- } finally {
- if (!success) {
- doStop();
- }
- }
- }
- @Override
- protected void stopInternal() {
- ThreadPool.terminate(executor, 10, TimeUnit.SECONDS);
- synchronized (openChannels) {
- assert openChannels.isEmpty() : "there are still open channels: " + openChannels;
- }
- }
diff --git a/server/sonar-server-common/src/testFixtures/java/org/sonar/server/es/EsTester.java b/server/sonar-server-common/src/testFixtures/java/org/sonar/server/es/EsTester.java
index 9fdb1cb3f81..86408377761 100644
--- a/server/sonar-server-common/src/testFixtures/java/org/sonar/server/es/EsTester.java
+++ b/server/sonar-server-common/src/testFixtures/java/org/sonar/server/es/EsTester.java
@@ -24,16 +24,19 @@ import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
+import java.io.IOException;
+import java.net.DatagramSocket;
+import java.net.ServerSocket;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang.reflect.ConstructorUtils;
@@ -48,31 +51,27 @@ import org.elasticsearch.analysis.common.CommonAnalysisPlugin;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings;
import org.elasticsearch.common.Strings;
-import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkModule;
-import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.common.util.BigArrays;
-import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
+import org.elasticsearch.http.BindHttpException;
+import org.elasticsearch.http.HttpTransportSettings;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.TermQueryBuilder;
-import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.join.ParentJoinPlugin;
import org.elasticsearch.node.InternalSettingsPreparer;
import org.elasticsearch.node.Node;
-import org.elasticsearch.plugins.NetworkPlugin;
-import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.node.NodeValidationException;
import org.elasticsearch.search.SearchHit;
-import org.elasticsearch.threadpool.ThreadPool;
-import org.elasticsearch.transport.MockTcpTransport;
-import org.elasticsearch.transport.Transport;
+import org.elasticsearch.transport.Netty4Plugin;
import org.junit.rules.ExternalResource;
+import org.sonar.api.utils.log.Logger;
+import org.sonar.api.utils.log.Loggers;
import org.sonar.server.component.index.ComponentIndexDefinition;
import org.sonar.server.es.IndexDefinition.IndexDefinitionContext;
import org.sonar.server.es.IndexType.IndexRelationType;
@@ -84,8 +83,10 @@ import org.sonar.server.rule.index.RuleIndexDefinition;
import org.sonar.server.user.index.UserIndexDefinition;
import org.sonar.server.view.index.ViewIndexDefinition;
+import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.Lists.newArrayList;
+import static java.lang.String.format;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.sonar.server.es.Index.ALL_INDICES;
import static org.sonar.server.es.IndexType.FIELD_INDEX_TYPE;
@@ -93,6 +94,11 @@ import static org.sonar.server.es.newindex.DefaultIndexSettings.REFRESH_IMMEDIAT
public class EsTester extends ExternalResource {
+ private static final int MIN_PORT = 1;
+ private static final int MAX_PORT = 49151;
+ private static final int MIN_NON_ROOT_PORT = 1025;
+ private static final Logger LOG = Loggers.get(EsTester.class);
static {
System.setProperty("log4j.shutdownHookEnabled", "false");
// we can not shutdown logging when tests are running or the next test that runs within the
@@ -356,49 +362,89 @@ public class EsTester extends ExternalResource {
try {
Path tempDir = Files.createTempDirectory("EsTester");
- Settings settings = Settings.builder()
- .put(Environment.PATH_HOME_SETTING.getKey(), tempDir)
- .put("node.name", "EsTester")
- .put(NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.getKey(), Integer.MAX_VALUE)
- .put("logger.level", "INFO")
- .put("action.auto_create_index", false)
- // Default the watermarks to absurdly low to prevent the tests
- // from failing on nodes without enough disk space
- // always reduce this - it can make tests really slow
- .put(RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING.getKey(), TimeValue.timeValueMillis(20))
- .put(NetworkModule.TRANSPORT_TYPE_KEY, "local")
- .put(NetworkModule.HTTP_ENABLED.getKey(), false)
- .put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "single-node")
- .build();
- Node node = new Node(InternalSettingsPreparer.prepareEnvironment(settings, null),
- ImmutableList.of(
- CommonAnalysisPlugin.class,
- // mock local transport plugin
- MockTcpTransportPlugin.class,
- // install ParentJoin plugin required to create field of type "join"
- ParentJoinPlugin.class),
- true) {
- @Override
- protected void registerDerivedNodeNameWithLogger(String nodeName) {
- // nothing to do
+ int i = 10;
+ while (i > 0) {
+ int httpPort = getNextAvailable();
+ try {
+ Node node = startNode(tempDir, httpPort);
+ LOG.info("EsTester running ElasticSearch on HTTP port {}", httpPort);
+ return node;
+ } catch (BindHttpException e) {
+ i--;
- };
- return node.start();
+ }
} catch (Exception e) {
throw new IllegalStateException("Fail to start embedded Elasticsearch", e);
+ throw new IllegalStateException("Failed to find an open port to connect EsTester's Elasticsearch instance after 10 attempts");
+ }
+ private static Node startNode(Path tempDir, int httpPort) throws NodeValidationException {
+ Settings settings = Settings.builder()
+ .put(Environment.PATH_HOME_SETTING.getKey(), tempDir)
+ .put("node.name", "EsTester")
+ .put(NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.getKey(), Integer.MAX_VALUE)
+ .put("logger.level", "INFO")
+ .put("action.auto_create_index", false)
+ // Default the watermarks to absurdly low to prevent the tests
+ // from failing on nodes without enough disk space
+ // always reduce this - it can make tests really slow
+ .put(RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING.getKey(), TimeValue.timeValueMillis(20))
+ .put(NetworkModule.HTTP_ENABLED.getKey(), true)
+ .put(HttpTransportSettings.SETTING_HTTP_PORT.getKey(), httpPort)
+ .put(HttpTransportSettings.SETTING_HTTP_BIND_HOST.getKey(), "localhost")
+ .put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "single-node")
+ .build();
+ Node node = new Node(InternalSettingsPreparer.prepareEnvironment(settings, null),
+ ImmutableList.of(
+ CommonAnalysisPlugin.class,
+ // Netty4Plugin provides http and tcp transport
+ Netty4Plugin.class,
+ // install ParentJoin plugin required to create field of type "join"
+ ParentJoinPlugin.class),
+ true) {
+ @Override
+ protected void registerDerivedNodeNameWithLogger(String nodeName) {
+ // nothing to do
+ }
+ };
+ return node.start();
- public static final class MockTcpTransportPlugin extends Plugin implements NetworkPlugin {
- @Override
- public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool, PageCacheRecycler pageCacheRecycler,
- CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) {
- return Collections.singletonMap(
- "local",
- () -> new MockTcpTransport(settings, threadPool, BigArrays.NON_RECYCLING_INSTANCE, circuitBreakerService, namedWriteableRegistry, networkService));
+ public static int getNextAvailable() {
+ Random random = new Random();
+ int maxAttempts = 10;
+ int i = maxAttempts;
+ while (i > 0) {
+ int port = MIN_NON_ROOT_PORT + random.nextInt(MAX_PORT - MIN_NON_ROOT_PORT);
+ if (available(port)) {
+ return port;
+ }
+ i--;
+ throw new NoSuchElementException(format("Could not find an available port in %s attempts", maxAttempts));
+ private static boolean available(int port) {
+ checkArgument(validPort(port), "Invalid port: %s", port);
+ try (ServerSocket ss = new ServerSocket(port)) {
+ ss.setReuseAddress(true);
+ try (DatagramSocket ds = new DatagramSocket(port)) {
+ ds.setReuseAddress(true);
+ }
+ return true;
+ } catch (IOException var13) {
+ return false;
+ }
+ }
+ private static boolean validPort(int fromPort) {
+ return fromPort >= MIN_PORT && fromPort <= MAX_PORT;
+ }