From 13825db7112ffc75e5c6d0355a22ddb5eff9c432 Mon Sep 17 00:00:00 2001 From: =?utf8?q?S=C3=A9bastien=20Lesaint?= Date: Wed, 8 Jan 2020 10:36:49 +0100 Subject: [PATCH] SONAR-12686 use Jetty for TCP ElasticSearch transport in unit tests LocalTransport is dropped in ElasticSearch 7.X --- .../transport/MockTcpTransport.java | 495 ------------------ .../java/org/sonar/server/es/EsTester.java | 144 +++-- 2 files changed, 95 insertions(+), 544 deletions(-) delete mode 100644 server/sonar-server-common/src/testFixtures/java/org/elasticsearch/transport/MockTcpTransport.java diff --git a/server/sonar-server-common/src/testFixtures/java/org/elasticsearch/transport/MockTcpTransport.java b/server/sonar-server-common/src/testFixtures/java/org/elasticsearch/transport/MockTcpTransport.java deleted file mode 100644 index 61ddd5b2a9d..00000000000 --- a/server/sonar-server-common/src/testFixtures/java/org/elasticsearch/transport/MockTcpTransport.java +++ /dev/null @@ -1,495 +0,0 @@ -/* - * SonarQube - * Copyright (C) 2009-2020 SonarSource SA - * mailto:info AT sonarsource DOT com - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License as published by the Free Software Foundation; either - * version 3 of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public License - * along with this program; if not, write to the Free Software Foundation, - * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ -package org.elasticsearch.transport; - -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; -import java.io.Closeable; -import java.io.EOFException; -import java.io.IOException; -import java.io.OutputStream; -import java.net.InetSocketAddress; -import java.net.ServerSocket; -import java.net.Socket; -import java.net.SocketException; -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.elasticsearch.Version; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.cli.SuppressForbidden; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.concurrent.CompletableContext; -import org.elasticsearch.common.io.stream.BytesStreamOutput; -import org.elasticsearch.common.io.stream.InputStreamStreamInput; -import org.elasticsearch.common.io.stream.NamedWriteableRegistry; -import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.network.NetworkService; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.util.BigArrays; -import org.elasticsearch.common.util.CancellableThreads; -import org.elasticsearch.common.util.PageCacheRecycler; -import org.elasticsearch.common.util.concurrent.AbstractRunnable; -import org.elasticsearch.common.util.concurrent.EsExecutors; -import org.elasticsearch.core.internal.io.IOUtils; -import org.elasticsearch.indices.breaker.CircuitBreakerService; -import org.elasticsearch.mocksocket.MockServerSocket; -import org.elasticsearch.mocksocket.MockSocket; -import org.elasticsearch.threadpool.ThreadPool; - -/** - * This is a socket based blocking TcpTransport implementation that is used for tests - * that need real networking. This implementation is a test only implementation that implements - * the networking layer in the worst possible way since it blocks and uses a thread per request model. - */ -public class MockTcpTransport extends TcpTransport { - private static final Logger logger = LogManager.getLogger(MockTcpTransport.class); - - /** - * A pre-built light connection profile that shares a single connection across all - * types. - */ - static final ConnectionProfile LIGHT_PROFILE; - - private final Set openChannels = new HashSet<>(); - - static { - ConnectionProfile.Builder builder = new ConnectionProfile.Builder(); - builder.addConnections(1, - TransportRequestOptions.Type.BULK, - TransportRequestOptions.Type.PING, - TransportRequestOptions.Type.RECOVERY, - TransportRequestOptions.Type.REG, - TransportRequestOptions.Type.STATE); - LIGHT_PROFILE = builder.build(); - } - - private final ExecutorService executor; - - public MockTcpTransport(Settings settings, ThreadPool threadPool, BigArrays bigArrays, - CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry, - NetworkService networkService) { - this(settings, threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, networkService, Version.CURRENT); - } - - public MockTcpTransport(Settings settings, ThreadPool threadPool, BigArrays bigArrays, - CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry, - NetworkService networkService, Version mockVersion) { - super("mock-tcp-transport", settings, mockVersion, threadPool, PageCacheRecycler.NON_RECYCLING_INSTANCE, circuitBreakerService, - namedWriteableRegistry, networkService); - // we have our own crazy cached threadpool this one is not bounded at all... - // using the ES thread factory here is crucial for tests otherwise disruption tests won't block that thread - executor = Executors.newCachedThreadPool(EsExecutors.daemonThreadFactory(settings, Transports.TEST_MOCK_TRANSPORT_THREAD_PREFIX)); - } - - @Override - protected MockChannel bind(final String name, InetSocketAddress address) throws IOException { - MockServerSocket socket = new MockServerSocket(); - socket.setReuseAddress(TransportSettings.TCP_REUSE_ADDRESS.get(settings)); - ByteSizeValue tcpReceiveBufferSize = TransportSettings.TCP_RECEIVE_BUFFER_SIZE.get(settings); - if (tcpReceiveBufferSize.getBytes() > 0) { - socket.setReceiveBufferSize(tcpReceiveBufferSize.bytesAsInt()); - } - socket.bind(address); - MockChannel serverMockChannel = new MockChannel(socket, name); - CountDownLatch started = new CountDownLatch(1); - executor.execute(new AbstractRunnable() { - @Override - public void onFailure(Exception e) { - onException(serverMockChannel, e); - } - - @Override - protected void doRun() throws Exception { - started.countDown(); - serverMockChannel.accept(executor); - } - }); - try { - started.await(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - return serverMockChannel; - } - - private void readMessage(MockChannel mockChannel, StreamInput input) throws IOException { - Socket socket = mockChannel.activeChannel; - byte[] minimalHeader = new byte[TcpHeader.MARKER_BYTES_SIZE + TcpHeader.MESSAGE_LENGTH_SIZE]; - try { - input.readFully(minimalHeader); - } catch (EOFException eof) { - throw new IOException("Connection reset by peer"); - } - - // Read message length will throw stream corrupted exception if the marker bytes incorrect - int msgSize = TcpTransport.readMessageLength(new BytesArray(minimalHeader)); - if (msgSize == -1) { - socket.getOutputStream().flush(); - } else { - final byte[] buffer = new byte[msgSize]; - input.readFully(buffer); - int expectedSize = TcpHeader.MARKER_BYTES_SIZE + TcpHeader.MESSAGE_LENGTH_SIZE + msgSize; - try (BytesStreamOutput output = new ReleasableBytesStreamOutput(expectedSize, bigArrays)) { - output.write(minimalHeader); - output.write(buffer); - consumeNetworkReads(mockChannel, output.bytes()); - } - } - } - - @Override - @SuppressForbidden(reason = "real socket for mocking remote connections") - protected MockChannel initiateChannel(DiscoveryNode node) throws IOException { - InetSocketAddress address = node.getAddress().address(); - final MockSocket socket = new MockSocket(); - final MockChannel channel = new MockChannel(socket, address, false, "none"); - - boolean success = false; - try { - configureSocket(socket); - success = true; - } finally { - if (!success) { - IOUtils.close(socket); - } - } - - executor.submit(() -> { - try { - socket.connect(address); - socket.setSoLinger(false, 0); - channel.connectFuture.complete(null); - channel.loopRead(executor); - } catch (Exception ex) { - channel.connectFuture.completeExceptionally(ex); - } - }); - - return channel; - } - - @Override - protected ConnectionProfile maybeOverrideConnectionProfile(ConnectionProfile connectionProfile) { - ConnectionProfile.Builder builder = new ConnectionProfile.Builder(); - Set allTypesWithConnection = new HashSet<>(); - Set allTypesWithoutConnection = new HashSet<>(); - for (ConnectionProfile.ConnectionTypeHandle handle : connectionProfile.getHandles()) { - Set types = handle.getTypes(); - if (handle.length > 0) { - allTypesWithConnection.addAll(types); - } else { - allTypesWithoutConnection.addAll(types); - } - } - // make sure we maintain at least the types that are supported by this profile even if we only use a single channel for them. - builder.addConnections(1, allTypesWithConnection.toArray(new TransportRequestOptions.Type[0])); - if (!allTypesWithoutConnection.isEmpty()) { - builder.addConnections(0, allTypesWithoutConnection.toArray(new TransportRequestOptions.Type[0])); - } - builder.setHandshakeTimeout(connectionProfile.getHandshakeTimeout()); - builder.setConnectTimeout(connectionProfile.getConnectTimeout()); - builder.setPingInterval(connectionProfile.getPingInterval()); - builder.setCompressionEnabled(connectionProfile.getCompressionEnabled()); - return builder.build(); - } - - private void configureSocket(Socket socket) throws SocketException { - socket.setTcpNoDelay(TransportSettings.TCP_NO_DELAY.get(settings)); - ByteSizeValue tcpSendBufferSize = TransportSettings.TCP_SEND_BUFFER_SIZE.get(settings); - if (tcpSendBufferSize.getBytes() > 0) { - socket.setSendBufferSize(tcpSendBufferSize.bytesAsInt()); - } - ByteSizeValue tcpReceiveBufferSize = TransportSettings.TCP_RECEIVE_BUFFER_SIZE.get(settings); - if (tcpReceiveBufferSize.getBytes() > 0) { - socket.setReceiveBufferSize(tcpReceiveBufferSize.bytesAsInt()); - } - socket.setReuseAddress(TransportSettings.TCP_REUSE_ADDRESS.get(settings)); - } - - public final class MockChannel implements Closeable, TcpChannel, TcpServerChannel { - private final AtomicBoolean isOpen = new AtomicBoolean(true); - private final InetSocketAddress localAddress; - private final ServerSocket serverSocket; - private final Set workerChannels = Collections.newSetFromMap(new ConcurrentHashMap<>()); - private final Socket activeChannel; - private final boolean isServer; - private final String profile; - private final CancellableThreads cancellableThreads = new CancellableThreads(); - private final CompletableContext closeFuture = new CompletableContext<>(); - private final CompletableContext connectFuture = new CompletableContext<>(); - private final ChannelStats stats = new ChannelStats(); - - /** - * Constructs a new MockChannel instance intended for handling the actual incoming / outgoing traffic. - * - * @param socket The client socket. Mut not be null. - * @param localAddress Address associated with the corresponding local server socket. Must not be null. - * @param profile The associated profile name. - */ - MockChannel(Socket socket, InetSocketAddress localAddress, boolean isServer, String profile) { - this.localAddress = localAddress; - this.activeChannel = socket; - this.isServer = isServer; - this.serverSocket = null; - this.profile = profile; - synchronized (openChannels) { - openChannels.add(this); - } - } - - /** - * Constructs a new MockChannel instance intended for accepting requests. - * - * @param serverSocket The associated server socket. Must not be null. - * @param profile The associated profile name. - */ - MockChannel(ServerSocket serverSocket, String profile) { - this.localAddress = (InetSocketAddress) serverSocket.getLocalSocketAddress(); - this.serverSocket = serverSocket; - this.profile = profile; - this.isServer = false; - this.activeChannel = null; - synchronized (openChannels) { - openChannels.add(this); - } - } - - public void accept(Executor executor) throws IOException { - while (isOpen.get()) { - Socket incomingSocket = serverSocket.accept(); - MockChannel incomingChannel = null; - try { - configureSocket(incomingSocket); - synchronized (this) { - if (isOpen.get()) { - InetSocketAddress localAddress = new InetSocketAddress(incomingSocket.getLocalAddress(), - incomingSocket.getPort()); - incomingChannel = new MockChannel(incomingSocket, localAddress, true, profile); - MockChannel finalIncomingChannel = incomingChannel; - incomingChannel.addCloseListener(new ActionListener() { - @Override - public void onResponse(Void aVoid) { - workerChannels.remove(finalIncomingChannel); - } - - @Override - public void onFailure(Exception e) { - workerChannels.remove(finalIncomingChannel); - } - }); - serverAcceptedChannel(incomingChannel); - //establish a happens-before edge between closing and accepting a new connection - workerChannels.add(incomingChannel); - - // this spawns a new thread immediately, so OK under lock - incomingChannel.loopRead(executor); - // the channel is properly registered and will be cleared by the close code. - incomingSocket = null; - incomingChannel = null; - } - } - } finally { - // ensure we don't leak sockets and channels in the failure case. Note that we null both - // if there are no exceptions so this becomes a no op. - IOUtils.closeWhileHandlingException(incomingSocket, incomingChannel); - } - } - } - - void loopRead(Executor executor) { - executor.execute(new AbstractRunnable() { - @Override - public void onFailure(Exception e) { - if (isOpen.get()) { - try { - onException(MockChannel.this, e); - } catch (Exception ex) { - logger.warn("failed on handling exception", ex); - IOUtils.closeWhileHandlingException(MockChannel.this); // pure paranoia - } - } - } - - @Override - protected void doRun() throws Exception { - StreamInput input = new InputStreamStreamInput(new BufferedInputStream(activeChannel.getInputStream())); - // There is a (slim) chance that we get interrupted right after a loop iteration, so check explicitly - while (isOpen.get() && !Thread.currentThread().isInterrupted()) { - cancellableThreads.executeIO(() -> readMessage(MockChannel.this, input)); - } - } - }); - } - - synchronized void close0() throws IOException { - // establish a happens-before edge between closing and accepting a new connection - // we have to sync this entire block to ensure that our openChannels checks work correctly. - // The close block below will close all worker channels but if one of the worker channels runs into an exception - // for instance due to a disconnect the handling of this exception might be executed concurrently. - // now if we are in-turn concurrently call close we might not wait for the actual close to happen and that will, down the road - // make the assertion trip that not all channels are closed. - if (isOpen.compareAndSet(true, false)) { - final boolean removedChannel; - synchronized (openChannels) { - removedChannel = openChannels.remove(this); - } - IOUtils.close(serverSocket, activeChannel, () -> IOUtils.close(workerChannels), - () -> cancellableThreads.cancel("channel closed")); - assert removedChannel: "Channel was not removed or removed twice?"; - } - } - - @Override - public String toString() { - return "MockChannel{" + - "profile='" + profile + '\'' + - ", isOpen=" + isOpen + - ", localAddress=" + localAddress + - ", isServerSocket=" + (serverSocket != null) + - '}'; - } - - @Override - public void close() { - try { - close0(); - closeFuture.complete(null); - } catch (IOException e) { - closeFuture.completeExceptionally(e); - } - } - - @Override - public String getProfile() { - return profile; - } - - @Override - public boolean isServerChannel() { - return isServer; - } - - @Override - public void addCloseListener(ActionListener listener) { - closeFuture.addListener(ActionListener.toBiConsumer(listener)); - } - - @Override - public void addConnectListener(ActionListener listener) { - connectFuture.addListener(ActionListener.toBiConsumer(listener)); - } - - @Override - public ChannelStats getChannelStats() { - return stats; - } - - @Override - public boolean isOpen() { - return isOpen.get(); - } - - @Override - public InetSocketAddress getLocalAddress() { - return localAddress; - } - - @Override - public InetSocketAddress getRemoteAddress() { - return (InetSocketAddress) activeChannel.getRemoteSocketAddress(); - } - - @Override - public void sendMessage(BytesReference reference, ActionListener listener) { - try { - synchronized (this) { - OutputStream outputStream = new BufferedOutputStream(activeChannel.getOutputStream()); - reference.writeTo(outputStream); - outputStream.flush(); - } - listener.onResponse(null); - } catch (IOException e) { - listener.onFailure(e); - onException(this, e); - } - } - } - - - @Override - protected void doStart() { - boolean success = false; - try { - if (NetworkService.NETWORK_SERVER.get(settings)) { - // loop through all profiles and start them up, special handling for default one - for (ProfileSettings profileSettings : profileSettings) { - bindServer(profileSettings); - } - } - super.doStart(); - success = true; - } finally { - if (!success) { - doStop(); - } - } - } - - @Override - protected void stopInternal() { - ThreadPool.terminate(executor, 10, TimeUnit.SECONDS); - synchronized (openChannels) { - assert openChannels.isEmpty() : "there are still open channels: " + openChannels; - } - } -} diff --git a/server/sonar-server-common/src/testFixtures/java/org/sonar/server/es/EsTester.java b/server/sonar-server-common/src/testFixtures/java/org/sonar/server/es/EsTester.java index 9fdb1cb3f81..86408377761 100644 --- a/server/sonar-server-common/src/testFixtures/java/org/sonar/server/es/EsTester.java +++ b/server/sonar-server-common/src/testFixtures/java/org/sonar/server/es/EsTester.java @@ -24,16 +24,19 @@ import com.google.common.collect.Collections2; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; +import java.io.IOException; +import java.net.DatagramSocket; +import java.net.ServerSocket; import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; -import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Random; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.commons.lang.reflect.ConstructorUtils; @@ -48,31 +51,27 @@ import org.elasticsearch.analysis.common.CommonAnalysisPlugin; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkModule; -import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.BigArrays; -import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.http.BindHttpException; +import org.elasticsearch.http.HttpTransportSettings; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.TermQueryBuilder; -import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.join.ParentJoinPlugin; import org.elasticsearch.node.InternalSettingsPreparer; import org.elasticsearch.node.Node; -import org.elasticsearch.plugins.NetworkPlugin; -import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.node.NodeValidationException; import org.elasticsearch.search.SearchHit; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.MockTcpTransport; -import org.elasticsearch.transport.Transport; +import org.elasticsearch.transport.Netty4Plugin; import org.junit.rules.ExternalResource; +import org.sonar.api.utils.log.Logger; +import org.sonar.api.utils.log.Loggers; import org.sonar.server.component.index.ComponentIndexDefinition; import org.sonar.server.es.IndexDefinition.IndexDefinitionContext; import org.sonar.server.es.IndexType.IndexRelationType; @@ -84,8 +83,10 @@ import org.sonar.server.rule.index.RuleIndexDefinition; import org.sonar.server.user.index.UserIndexDefinition; import org.sonar.server.view.index.ViewIndexDefinition; +import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static com.google.common.collect.Lists.newArrayList; +import static java.lang.String.format; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; import static org.sonar.server.es.Index.ALL_INDICES; import static org.sonar.server.es.IndexType.FIELD_INDEX_TYPE; @@ -93,6 +94,11 @@ import static org.sonar.server.es.newindex.DefaultIndexSettings.REFRESH_IMMEDIAT public class EsTester extends ExternalResource { + private static final int MIN_PORT = 1; + private static final int MAX_PORT = 49151; + private static final int MIN_NON_ROOT_PORT = 1025; + private static final Logger LOG = Loggers.get(EsTester.class); + static { System.setProperty("log4j.shutdownHookEnabled", "false"); // we can not shutdown logging when tests are running or the next test that runs within the @@ -356,49 +362,89 @@ public class EsTester extends ExternalResource { try { Path tempDir = Files.createTempDirectory("EsTester"); tempDir.toFile().deleteOnExit(); - Settings settings = Settings.builder() - .put(Environment.PATH_HOME_SETTING.getKey(), tempDir) - .put("node.name", "EsTester") - .put(NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.getKey(), Integer.MAX_VALUE) - .put("logger.level", "INFO") - .put("action.auto_create_index", false) - // Default the watermarks to absurdly low to prevent the tests - // from failing on nodes without enough disk space - .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "1b") - .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "1b") - .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(), "1b") - // always reduce this - it can make tests really slow - .put(RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING.getKey(), TimeValue.timeValueMillis(20)) - .put(NetworkModule.TRANSPORT_TYPE_KEY, "local") - .put(NetworkModule.HTTP_ENABLED.getKey(), false) - .put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "single-node") - .build(); - Node node = new Node(InternalSettingsPreparer.prepareEnvironment(settings, null), - ImmutableList.of( - CommonAnalysisPlugin.class, - // mock local transport plugin - MockTcpTransportPlugin.class, - // install ParentJoin plugin required to create field of type "join" - ParentJoinPlugin.class), - true) { - @Override - protected void registerDerivedNodeNameWithLogger(String nodeName) { - // nothing to do + int i = 10; + while (i > 0) { + int httpPort = getNextAvailable(); + try { + Node node = startNode(tempDir, httpPort); + LOG.info("EsTester running ElasticSearch on HTTP port {}", httpPort); + return node; + } catch (BindHttpException e) { + i--; } - }; - return node.start(); + } } catch (Exception e) { throw new IllegalStateException("Fail to start embedded Elasticsearch", e); } + throw new IllegalStateException("Failed to find an open port to connect EsTester's Elasticsearch instance after 10 attempts"); + } + + private static Node startNode(Path tempDir, int httpPort) throws NodeValidationException { + Settings settings = Settings.builder() + .put(Environment.PATH_HOME_SETTING.getKey(), tempDir) + .put("node.name", "EsTester") + .put(NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.getKey(), Integer.MAX_VALUE) + .put("logger.level", "INFO") + .put("action.auto_create_index", false) + // Default the watermarks to absurdly low to prevent the tests + // from failing on nodes without enough disk space + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "1b") + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "1b") + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(), "1b") + // always reduce this - it can make tests really slow + .put(RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING.getKey(), TimeValue.timeValueMillis(20)) + .put(NetworkModule.HTTP_ENABLED.getKey(), true) + .put(HttpTransportSettings.SETTING_HTTP_PORT.getKey(), httpPort) + .put(HttpTransportSettings.SETTING_HTTP_BIND_HOST.getKey(), "localhost") + .put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "single-node") + .build(); + Node node = new Node(InternalSettingsPreparer.prepareEnvironment(settings, null), + ImmutableList.of( + CommonAnalysisPlugin.class, + // Netty4Plugin provides http and tcp transport + Netty4Plugin.class, + // install ParentJoin plugin required to create field of type "join" + ParentJoinPlugin.class), + true) { + @Override + protected void registerDerivedNodeNameWithLogger(String nodeName) { + // nothing to do + } + }; + return node.start(); } - public static final class MockTcpTransportPlugin extends Plugin implements NetworkPlugin { - @Override - public Map> getTransports(Settings settings, ThreadPool threadPool, PageCacheRecycler pageCacheRecycler, - CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) { - return Collections.singletonMap( - "local", - () -> new MockTcpTransport(settings, threadPool, BigArrays.NON_RECYCLING_INSTANCE, circuitBreakerService, namedWriteableRegistry, networkService)); + public static int getNextAvailable() { + Random random = new Random(); + int maxAttempts = 10; + int i = maxAttempts; + while (i > 0) { + int port = MIN_NON_ROOT_PORT + random.nextInt(MAX_PORT - MIN_NON_ROOT_PORT); + if (available(port)) { + return port; + } + i--; } + + throw new NoSuchElementException(format("Could not find an available port in %s attempts", maxAttempts)); } + + private static boolean available(int port) { + checkArgument(validPort(port), "Invalid port: %s", port); + + try (ServerSocket ss = new ServerSocket(port)) { + ss.setReuseAddress(true); + try (DatagramSocket ds = new DatagramSocket(port)) { + ds.setReuseAddress(true); + } + return true; + } catch (IOException var13) { + return false; + } + } + + private static boolean validPort(int fromPort) { + return fromPort >= MIN_PORT && fromPort <= MAX_PORT; + } + } -- 2.39.5