diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java index efa296b6278af..9b1b0a9c90df2 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java @@ -73,7 +73,7 @@ protected Version getCurrentVersion() { } }; MockTransportService mockTransportService = - MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, clusterSettings, Collections.emptySet()); + MockTransportService.createNewService(settings, transport, version, threadPool, clusterSettings, Collections.emptySet()); mockTransportService.start(); return mockTransportService; } diff --git a/server/src/main/java/org/elasticsearch/transport/ConnectionProfile.java b/server/src/main/java/org/elasticsearch/transport/ConnectionProfile.java index e14f684bf72ef..429cb46790b9f 100644 --- a/server/src/main/java/org/elasticsearch/transport/ConnectionProfile.java +++ b/server/src/main/java/org/elasticsearch/transport/ConnectionProfile.java @@ -18,7 +18,9 @@ */ package org.elasticsearch.transport; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import java.util.ArrayList; @@ -26,6 +28,7 @@ import java.util.Collections; import java.util.EnumSet; import java.util.List; +import java.util.Objects; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; @@ -70,6 +73,55 @@ private ConnectionProfile(List handles, int numConnections } /** +<<<<<<< HEAD +======= + * takes a {@link ConnectionProfile} resolves it to a fully specified (i.e., no nulls) profile + */ + public static ConnectionProfile resolveConnectionProfile(@Nullable ConnectionProfile profile, ConnectionProfile fallbackProfile) { + Objects.requireNonNull(fallbackProfile); + if (profile == null) { + return fallbackProfile; + } else if (profile.getConnectTimeout() != null && profile.getHandshakeTimeout() != null) { + return profile; + } else { + ConnectionProfile.Builder builder = new ConnectionProfile.Builder(profile); + if (profile.getConnectTimeout() == null) { + builder.setConnectTimeout(fallbackProfile.getConnectTimeout()); + } + if (profile.getHandshakeTimeout() == null) { + builder.setHandshakeTimeout(fallbackProfile.getHandshakeTimeout()); + } + return builder.build(); + } + } + + /** + * Builds a default connection profile based on the provided settings. + * + * @param settings to build the connection profile from + * @return the connection profile + */ + public static ConnectionProfile buildDefaultConnectionProfile(Settings settings) { + int connectionsPerNodeRecovery = TcpTransport.CONNECTIONS_PER_NODE_RECOVERY.get(settings); + int connectionsPerNodeBulk = TcpTransport.CONNECTIONS_PER_NODE_BULK.get(settings); + int connectionsPerNodeReg = TcpTransport.CONNECTIONS_PER_NODE_REG.get(settings); + int connectionsPerNodeState = TcpTransport.CONNECTIONS_PER_NODE_STATE.get(settings); + int connectionsPerNodePing = TcpTransport.CONNECTIONS_PER_NODE_PING.get(settings); + Builder builder = new Builder(); + builder.setConnectTimeout(TcpTransport.TCP_CONNECT_TIMEOUT.get(settings)); + builder.setHandshakeTimeout(TcpTransport.TCP_CONNECT_TIMEOUT.get(settings)); + builder.addConnections(connectionsPerNodeBulk, TransportRequestOptions.Type.BULK); + builder.addConnections(connectionsPerNodePing, TransportRequestOptions.Type.PING); + // if we are not master eligible we don't need a dedicated channel to publish the state + builder.addConnections(DiscoveryNode.isMasterNode(settings) ? connectionsPerNodeState : 0, TransportRequestOptions.Type.STATE); + // if we are not a data-node we don't need any dedicated channels for recovery + builder.addConnections(DiscoveryNode.isDataNode(settings) ? connectionsPerNodeRecovery : 0, TransportRequestOptions.Type.RECOVERY); + builder.addConnections(connectionsPerNodeReg, TransportRequestOptions.Type.REG); + return builder.build(); + } + + /** +>>>>>>> 419ad4569dc... Reduce channels in AbstractSimpleTransportTestCase (#34863) (#34880) * A builder to build a new {@link ConnectionProfile} */ public static class Builder { diff --git a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index fb8c54540ae97..6cfe25455be5b 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -110,7 +110,8 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { protected abstract MockTransportService build(Settings settings, Version version, ClusterSettings clusterSettings, boolean doHandshake); protected int channelsPerNodeConnection() { - return 13; + // This is a customized profile for this test case. + return 6; } @Override @@ -119,9 +120,17 @@ public void setUp() throws Exception { super.setUp(); threadPool = new TestThreadPool(getClass().getName()); clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - serviceA = buildService("TS_A", version0, clusterSettings); // this one supports dynamic tracer updates + Settings connectionSettings = Settings.builder() + .put(TcpTransport.CONNECTIONS_PER_NODE_RECOVERY.getKey(), 1) + .put(TcpTransport.CONNECTIONS_PER_NODE_BULK.getKey(), 1) + .put(TcpTransport.CONNECTIONS_PER_NODE_REG.getKey(), 2) + .put(TcpTransport.CONNECTIONS_PER_NODE_STATE.getKey(), 1) + .put(TcpTransport.CONNECTIONS_PER_NODE_PING.getKey(), 1) + .build(); + + serviceA = buildService("TS_A", version0, clusterSettings, connectionSettings); // this one supports dynamic tracer updates nodeA = serviceA.getLocalNode(); - serviceB = buildService("TS_B", version1, null); // this one doesn't support dynamic tracer updates + serviceB = buildService("TS_B", version1, null, connectionSettings); // this one doesn't support dynamic tracer updates nodeB = serviceB.getLocalNode(); // wait till all nodes are properly connected and the event has been sent, so tests in this class // will not get this callback called on the connections done in this setup @@ -167,8 +176,13 @@ private MockTransportService buildService(final String name, final Version versi return service; } - private MockTransportService buildService(final String name, final Version version, ClusterSettings clusterSettings) { - return buildService(name, version, clusterSettings, Settings.EMPTY, true, true); + protected MockTransportService buildService(final String name, final Version version, ClusterSettings clusterSettings) { + return buildService(name, version, clusterSettings, Settings.EMPTY); + } + + protected MockTransportService buildService(final String name, final Version version, ClusterSettings clusterSettings, + Settings settings) { + return buildService(name, version, clusterSettings, settings, true, true); } @Override diff --git a/test/framework/src/test/java/org/elasticsearch/transport/MockTcpTransportTests.java b/test/framework/src/test/java/org/elasticsearch/transport/MockTcpTransportTests.java index e9f5f86462f54..b0a61c5a7d0a4 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/MockTcpTransportTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/MockTcpTransportTests.java @@ -50,7 +50,7 @@ protected Version executeHandshake(DiscoveryNode node, TcpChannel mockChannel, T } }; MockTransportService mockTransportService = - MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, clusterSettings, Collections.emptySet()); + MockTransportService.createNewService(settings, transport, version, threadPool, clusterSettings, Collections.emptySet()); mockTransportService.start(); return mockTransportService; }