From 6f0cfb52e61600fa9e42cfe7a78d307076c014a5 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 21 Aug 2018 16:47:46 +0200 Subject: [PATCH 1/3] Add proxy support to RemoteClusterConnection This adds support for connecting to a remote cluster through a tcp proxy. A remote cluster can configured with an additional `search.remote.$clustername.proxy` setting. This proxy will be used to connect to remote nodes for every node connection established. We still try to sniff the remote clsuter and connect to nodes directly through the proxy which has to support some kind of routing to these nodes. Yet, this routing mechanism requires the handshake request to include some kind of information where to route to which is not yet implemented. The effort to use the hostname and an optional node attribute for routing is tracked in #32517 Closes #31840 --- .../common/settings/ClusterSettings.java | 1 + .../common/settings/Setting.java | 4 + .../transport/RemoteClusterAware.java | 56 +++++++-- .../transport/RemoteClusterConnection.java | 36 +++++- .../transport/RemoteClusterService.java | 41 +++---- .../RemoteClusterConnectionTests.java | 111 +++++++++++++++++- .../transport/RemoteClusterServiceTests.java | 104 +++++++++++++--- .../test/transport/MockTransportService.java | 9 +- .../authz/IndicesAndAliasesResolver.java | 2 +- 9 files changed, 306 insertions(+), 58 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index bf53a3dc01a7a..237fc911db627 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -272,6 +272,7 @@ public void apply(Settings value, Settings current, Settings previous) { ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING, TransportSearchAction.SHARD_COUNT_LIMIT_SETTING, RemoteClusterAware.REMOTE_CLUSTERS_SEEDS, + RemoteClusterAware.REMOTE_CLUSTERS_PROXY, RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE, RemoteClusterService.REMOTE_CONNECTIONS_PER_CLUSTER, RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING, diff --git a/server/src/main/java/org/elasticsearch/common/settings/Setting.java b/server/src/main/java/org/elasticsearch/common/settings/Setting.java index b98c2753d701b..7b432c0ed1e18 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/Setting.java +++ b/server/src/main/java/org/elasticsearch/common/settings/Setting.java @@ -1009,6 +1009,10 @@ public static Setting simpleString(String key, Property... properties) { return new Setting<>(key, s -> "", Function.identity(), properties); } + public static Setting simpleString(String key, Function parser, Property... properties) { + return new Setting<>(key, s -> "", parser, properties); + } + public static Setting simpleString(String key, Setting fallback, Property... properties) { return new Setting<>(key, fallback, Function.identity(), properties); } diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java index a12f27c93e3c4..8d5a9bb523c0e 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java @@ -18,10 +18,14 @@ */ package org.elasticsearch.transport; +import java.util.EnumSet; import java.util.function.Supplier; import org.elasticsearch.Version; import org.elasticsearch.cluster.metadata.ClusterNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; @@ -66,6 +70,22 @@ public abstract class RemoteClusterAware extends AbstractComponent { public static final char REMOTE_CLUSTER_INDEX_SEPARATOR = ':'; public static final String LOCAL_CLUSTER_GROUP_KEY = ""; + /** + * A proxy address for the remote cluster. + * NOTE: this settings is undocumented until we have at last one transport that supports passing + * on the hostname via a mechanism like SNI. + */ + public static final Setting.AffixSetting REMOTE_CLUSTERS_PROXY = Setting.affixKeySetting( + "search.remote.", + "proxy", + key -> Setting.simpleString(key, s -> { + if (Strings.hasLength(s)) { + parsePort(s); + } + return s; + }, Setting.Property.NodeScope, Setting.Property.Dynamic), REMOTE_CLUSTERS_SEEDS); + + protected final ClusterNameExpressionResolver clusterNameResolver; /** @@ -77,25 +97,37 @@ protected RemoteClusterAware(Settings settings) { this.clusterNameResolver = new ClusterNameExpressionResolver(settings); } - protected static Map>> buildRemoteClustersSeeds(Settings settings) { + protected static Map>>> buildRemoteClustersSeeds(Settings settings) { Stream>> allConcreteSettings = REMOTE_CLUSTERS_SEEDS.getAllConcreteSettings(settings); return allConcreteSettings.collect( Collectors.toMap(REMOTE_CLUSTERS_SEEDS::getNamespace, concreteSetting -> { String clusterName = REMOTE_CLUSTERS_SEEDS.getNamespace(concreteSetting); List addresses = concreteSetting.get(settings); + final boolean proxyMode = REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(clusterName).exists(settings); List> nodes = new ArrayList<>(addresses.size()); for (String address : addresses) { - nodes.add(() -> { - TransportAddress transportAddress = new TransportAddress(RemoteClusterAware.parseSeedAddress(address)); - return new DiscoveryNode(clusterName + "#" + transportAddress.toString(), - transportAddress, - Version.CURRENT.minimumCompatibilityVersion()); - }); + nodes.add(() -> buildSeedNode(clusterName, address, proxyMode)); } - return nodes; + return new Tuple<>(REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(clusterName).get(settings), nodes); })); } + public static DiscoveryNode buildSeedNode(String clusterName, String address, boolean proxyMode) { + if (proxyMode) { + TransportAddress transportAddress = new TransportAddress(TransportAddress.META_ADDRESS, 0); + String hostName = address.substring(0, indexOfPortSeparator(address)); + return new DiscoveryNode("", clusterName + "#" + address, UUIDs.randomBase64UUID(), hostName, address, + transportAddress, Collections + .emptyMap(), EnumSet.allOf(DiscoveryNode.Role.class), + Version.CURRENT.minimumCompatibilityVersion()); + } else { + TransportAddress transportAddress = new TransportAddress(RemoteClusterAware.parseSeedAddress(address)); + return new DiscoveryNode(clusterName + "#" + transportAddress.toString(), + transportAddress, + Version.CURRENT.minimumCompatibilityVersion()); + } + } + /** * Groups indices per cluster by splitting remote cluster-alias, index-name pairs on {@link #REMOTE_CLUSTER_INDEX_SEPARATOR}. All * indices per cluster are collected as a list in the returned map keyed by the cluster alias. Local indices are grouped under @@ -138,20 +170,24 @@ public Map> groupClusterIndices(String[] requestIndices, Pr protected abstract Set getRemoteClusterNames(); + /** * Subclasses must implement this to receive information about updated cluster aliases. If the given address list is * empty the cluster alias is unregistered and should be removed. */ - protected abstract void updateRemoteCluster(String clusterAlias, List addresses); + protected abstract void updateRemoteCluster(String clusterAlias, List addresses, String proxy); /** * Registers this instance to listen to updates on the cluster settings. */ public void listenForUpdates(ClusterSettings clusterSettings) { - clusterSettings.addAffixUpdateConsumer(RemoteClusterAware.REMOTE_CLUSTERS_SEEDS, this::updateRemoteCluster, + clusterSettings.addAffixUpdateConsumer(RemoteClusterAware.REMOTE_CLUSTERS_PROXY, + RemoteClusterAware.REMOTE_CLUSTERS_SEEDS, + (key, value) -> updateRemoteCluster(key, value.v2(), value.v1()), (namespace, value) -> {}); } + protected static InetSocketAddress parseSeedAddress(String remoteHost) { String host = remoteHost.substring(0, indexOfPortSeparator(remoteHost)); InetAddress hostAddress; diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java index 5621b38557814..9076f90b1fbdf 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.transport; +import java.net.InetSocketAddress; import java.util.function.Supplier; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.store.AlreadyClosedException; @@ -34,6 +35,7 @@ import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; @@ -88,6 +90,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo private final int maxNumRemoteConnections; private final Predicate nodePredicate; private final ThreadPool threadPool; + private volatile String proxyAddress; private volatile List> seedNodes; private volatile boolean skipUnavailable; private final ConnectHandler connectHandler; @@ -106,6 +109,13 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo RemoteClusterConnection(Settings settings, String clusterAlias, List> seedNodes, TransportService transportService, ConnectionManager connectionManager, int maxNumRemoteConnections, Predicate nodePredicate) { + this(settings, clusterAlias, seedNodes, transportService, connectionManager, maxNumRemoteConnections, nodePredicate, null); + } + + RemoteClusterConnection(Settings settings, String clusterAlias, List> seedNodes, + TransportService transportService, ConnectionManager connectionManager, int maxNumRemoteConnections, Predicate + nodePredicate, + String proxyAddress) { super(settings); this.transportService = transportService; this.maxNumRemoteConnections = maxNumRemoteConnections; @@ -130,13 +140,26 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo connectionManager.addListener(this); // we register the transport service here as a listener to make sure we notify handlers on disconnect etc. connectionManager.addListener(transportService); + this.proxyAddress = proxyAddress; + } + + private static DiscoveryNode maybeAddProxyAddress(String proxyAddress, DiscoveryNode node) { + if (proxyAddress == null || proxyAddress.isEmpty()) { + return node; + } else { + // resovle proxy address lazy here + InetSocketAddress proxyInetAddress = RemoteClusterAware.parseSeedAddress(proxyAddress); + return new DiscoveryNode(node.getName(), node.getId(), node.getEphemeralId(), node.getHostName(), node + .getHostAddress(), new TransportAddress(proxyInetAddress), node.getAttributes(), node.getRoles(), node.getVersion()); + } } /** * Updates the list of seed nodes for this cluster connection */ - synchronized void updateSeedNodes(List> seedNodes, ActionListener connectListener) { + synchronized void updateSeedNodes(String proxyAddress, List> seedNodes, ActionListener connectListener) { this.seedNodes = Collections.unmodifiableList(new ArrayList<>(seedNodes)); + this.proxyAddress = proxyAddress; connectHandler.connect(connectListener); } @@ -281,6 +304,7 @@ Transport.Connection getConnection(DiscoveryNode remoteClusterNode) { return new ProxyConnection(connection, remoteClusterNode); } + static final class ProxyConnection implements Transport.Connection { private final Transport.Connection proxyConnection; private final DiscoveryNode targetNode; @@ -461,7 +485,7 @@ private void collectRemoteNodes(Iterator> seedNodes, try { if (seedNodes.hasNext()) { cancellableThreads.executeIO(() -> { - final DiscoveryNode seedNode = seedNodes.next().get(); + final DiscoveryNode seedNode = maybeAddProxyAddress(proxyAddress, seedNodes.next().get()); final TransportService.HandshakeResponse handshakeResponse; Transport.Connection connection = manager.openConnection(seedNode, ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, null, null)); @@ -476,7 +500,7 @@ private void collectRemoteNodes(Iterator> seedNodes, throw ex; } - final DiscoveryNode handshakeNode = handshakeResponse.getDiscoveryNode(); + final DiscoveryNode handshakeNode = maybeAddProxyAddress(proxyAddress, handshakeResponse.getDiscoveryNode()); if (nodePredicate.test(handshakeNode) && connectedNodes.size() < maxNumRemoteConnections) { manager.connectToNode(handshakeNode, remoteProfile, transportService.connectionValidator(handshakeNode)); if (remoteClusterName.get() == null) { @@ -583,7 +607,8 @@ public void handleResponse(ClusterStateResponse response) { cancellableThreads.executeIO(() -> { DiscoveryNodes nodes = response.getState().nodes(); Iterable nodesIter = nodes.getNodes()::valuesIt; - for (DiscoveryNode node : nodesIter) { + for (DiscoveryNode n : nodesIter) { + DiscoveryNode node = maybeAddProxyAddress(proxyAddress, n); if (nodePredicate.test(node) && connectedNodes.size() < maxNumRemoteConnections) { try { connectionManager.connectToNode(node, remoteProfile, @@ -646,7 +671,8 @@ void addConnectedNode(DiscoveryNode node) { * Get the information about remote nodes to be rendered on {@code _remote/info} requests. */ public RemoteConnectionInfo getConnectionInfo() { - List seedNodeAddresses = seedNodes.stream().map(node -> node.get().getAddress()).collect(Collectors.toList()); + List seedNodeAddresses = seedNodes.stream().map(node -> node.get().getAddress()).collect + (Collectors.toList()); TimeValue initialConnectionTimeout = RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings); return new RemoteConnectionInfo(clusterAlias, seedNodeAddresses, maxNumRemoteConnections, connectedNodes.size(), initialConnectionTimeout, skipUnavailable); diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index 34f13b672874f..9a80d6b81bb0c 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -31,10 +31,10 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Booleans; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.core.internal.io.IOUtils; @@ -116,8 +116,8 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl * @param seeds a cluster alias to discovery node mapping representing the remote clusters seeds nodes * @param connectionListener a listener invoked once every configured cluster has been connected to */ - private synchronized void updateRemoteClusters(Map>> seeds, - ActionListener connectionListener) { + private synchronized void updateRemoteClusters(Map>>> seeds, + ActionListener connectionListener) { if (seeds.containsKey(LOCAL_CLUSTER_GROUP_KEY)) { throw new IllegalArgumentException("remote clusters must not have the empty string as its key"); } @@ -127,9 +127,12 @@ private synchronized void updateRemoteClusters(Map>> entry : seeds.entrySet()) { + for (Map.Entry>>> entry : seeds.entrySet()) { + List> seedList = entry.getValue().v2(); + String proxyAddress = entry.getValue().v1(); + RemoteClusterConnection remote = this.remoteClusters.get(entry.getKey()); - if (entry.getValue().isEmpty()) { // with no seed nodes we just remove the connection + if (seedList.isEmpty()) { // with no seed nodes we just remove the connection try { IOUtils.close(remote); } catch (IOException e) { @@ -140,15 +143,15 @@ private synchronized void updateRemoteClusters(Map { if (countDown.countDown()) { connectionListener.onResponse(response); @@ -302,8 +305,7 @@ protected Set getRemoteClusterNames() { @Override public void listenForUpdates(ClusterSettings clusterSettings) { super.listenForUpdates(clusterSettings); - clusterSettings.addAffixUpdateConsumer(REMOTE_CLUSTER_SKIP_UNAVAILABLE, this::updateSkipUnavailable, - (clusterAlias, value) -> {}); + clusterSettings.addAffixUpdateConsumer(REMOTE_CLUSTER_SKIP_UNAVAILABLE, this::updateSkipUnavailable, (alias, value) -> {}); } synchronized void updateSkipUnavailable(String clusterAlias, Boolean skipUnavailable) { @@ -313,22 +315,21 @@ synchronized void updateSkipUnavailable(String clusterAlias, Boolean skipUnavail } } + @Override - protected void updateRemoteCluster(String clusterAlias, List addresses) { - updateRemoteCluster(clusterAlias, addresses, ActionListener.wrap((x) -> {}, (x) -> {})); + protected void updateRemoteCluster(String clusterAlias, List addresses, String proxyAddress) { + updateRemoteCluster(clusterAlias, addresses, proxyAddress, ActionListener.wrap((x) -> {}, (x) -> {})); } void updateRemoteCluster( final String clusterAlias, final List addresses, + final String proxyAddress, final ActionListener connectionListener) { - final List> nodes = addresses.stream().>map(address -> () -> { - final TransportAddress transportAddress = new TransportAddress(RemoteClusterAware.parseSeedAddress(address)); - final String id = clusterAlias + "#" + transportAddress.toString(); - final Version version = Version.CURRENT.minimumCompatibilityVersion(); - return new DiscoveryNode(id, transportAddress, version); - }).collect(Collectors.toList()); - updateRemoteClusters(Collections.singletonMap(clusterAlias, nodes), connectionListener); + final List> nodes = addresses.stream().>map(address -> () -> + buildSeedNode(clusterAlias, address, Strings.hasLength(proxyAddress)) + ).collect(Collectors.toList()); + updateRemoteClusters(Collections.singletonMap(clusterAlias, new Tuple<>(proxyAddress, nodes)), connectionListener); } /** @@ -338,7 +339,7 @@ void updateRemoteCluster( void initializeRemoteClusters() { final TimeValue timeValue = REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings); final PlainActionFuture future = new PlainActionFuture<>(); - Map>> seeds = RemoteClusterAware.buildRemoteClustersSeeds(settings); + Map>>> seeds = RemoteClusterAware.buildRemoteClustersSeeds(settings); updateRemoteClusters(seeds, future); try { future.get(timeValue.millis(), TimeUnit.MILLISECONDS); diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java index e40486d63dc40..0ecc8705a47a1 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java @@ -18,6 +18,8 @@ */ package org.elasticsearch.transport; +import java.util.HashMap; +import java.util.Map; import java.util.function.Supplier; import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.Version; @@ -52,6 +54,7 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.test.transport.StubbableTransport; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; @@ -378,15 +381,19 @@ public void testFilterDiscoveredNodes() throws Exception { } } } - private void updateSeedNodes(RemoteClusterConnection connection, List> seedNodes) throws Exception { + updateSeedNodes(connection, seedNodes, null); + } + + private void updateSeedNodes(RemoteClusterConnection connection, List> seedNodes, String proxyAddress) + throws Exception { CountDownLatch latch = new CountDownLatch(1); AtomicReference exceptionAtomicReference = new AtomicReference<>(); ActionListener listener = ActionListener.wrap(x -> latch.countDown(), x -> { exceptionAtomicReference.set(x); latch.countDown(); }); - connection.updateSeedNodes(seedNodes, listener); + connection.updateSeedNodes(proxyAddress, seedNodes, listener); latch.await(); if (exceptionAtomicReference.get() != null) { throw exceptionAtomicReference.get(); @@ -517,7 +524,7 @@ public void run() { exceptionReference.set(x); listenerCalled.countDown(); }); - connection.updateSeedNodes(Arrays.asList(() -> seedNode), listener); + connection.updateSeedNodes(null, Arrays.asList(() -> seedNode), listener); acceptedLatch.await(); connection.close(); // now close it, this should trigger an interrupt on the socket and we can move on assertTrue(connection.assertNoRunningConnections()); @@ -787,7 +794,7 @@ public void run() { throw new AssertionError(x); } }); - connection.updateSeedNodes(seedNodes, listener); + connection.updateSeedNodes(null, seedNodes, listener); } latch.await(); } catch (Exception ex) { @@ -875,7 +882,7 @@ public void run() { } }); try { - connection.updateSeedNodes(seedNodes, listener); + connection.updateSeedNodes(null, seedNodes, listener); } catch (Exception e) { // it's ok if we're shutting down assertThat(e.getMessage(), containsString("threadcontext is already closed")); @@ -1384,4 +1391,98 @@ public void testLazyResolveTransportAddress() throws Exception { } } } + + public void testProxyMode() throws Exception { + List knownNodes = new CopyOnWriteArrayList<>(); + try (MockTransportService seedTransport = startTransport("node_0", knownNodes, Version.CURRENT); + MockTransportService discoverableTransport = startTransport("node_1", knownNodes, Version.CURRENT)) { + knownNodes.add(seedTransport.getLocalDiscoNode()); + knownNodes.add(discoverableTransport.getLocalDiscoNode()); + Collections.shuffle(knownNodes, random()); + final String proxyAddress = "1.1.1.1:99"; + Map nodes = new HashMap<>(); + nodes.put("node_0", seedTransport.getLocalDiscoNode()); + nodes.put("node_1", discoverableTransport.getLocalDiscoNode()); + Transport mockTcpTransport = getProxyTransport(threadPool, Collections.singletonMap(proxyAddress, nodes)); + try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, mockTcpTransport, Version.CURRENT, + threadPool, null, Collections.emptySet())) { + service.start(); + service.acceptIncomingRequests(); + Supplier seedSupplier = () -> + RemoteClusterAware.buildSeedNode("some-remote-cluster", "node_0:" + randomIntBetween(1, 10000), true); + try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", + Arrays.asList(seedSupplier), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, proxyAddress)) { + updateSeedNodes(connection, Arrays.asList(seedSupplier), proxyAddress); + assertEquals(2, connection.getNumNodesConnected()); + assertNotNull(connection.getConnection(discoverableTransport.getLocalDiscoNode())); + assertNotNull(connection.getConnection(seedTransport.getLocalDiscoNode())); + assertEquals(proxyAddress, connection.getConnection(seedTransport.getLocalDiscoNode()) + .getNode().getAddress().toString()); + assertEquals(proxyAddress, connection.getConnection(discoverableTransport.getLocalDiscoNode()) + .getNode().getAddress().toString()); + service.getConnectionManager().disconnectFromNode(knownNodes.get(0)); + // ensure we reconnect + assertBusy(() -> { + assertEquals(2, connection.getNumNodesConnected()); + }); + discoverableTransport.close(); + seedTransport.close(); + } + } + } + } + + public static Transport getProxyTransport(ThreadPool threadPool, Map> nodeMap) { + if (nodeMap.isEmpty()) { + throw new IllegalArgumentException("nodeMap must be non-empty"); + } + + return new StubbableTransport(MockTransportService.newMockTransport(Settings.EMPTY, Version + .CURRENT, threadPool)) { + @Override + public Connection openConnection(final DiscoveryNode node, ConnectionProfile profile) { + Map proxyMapping = nodeMap.get(node.getAddress().toString()); + if (proxyMapping == null) { + throw new IllegalStateException("no proxy mapping for node: " + node); + } + DiscoveryNode proxyNode = proxyMapping.get(node.getName()); + if (proxyNode == null) { + // this is a seednode - lets pick one randomly + assertEquals("seed node must not have a port in the hostname: " + node.getHostName(), + -1, node.getHostName().lastIndexOf(':')); + assertTrue("missing hostname: " + node, proxyMapping.containsKey(node.getHostName())); + // route by seed hostname + proxyNode = proxyMapping.get(node.getHostName()); + } + Connection connection = super.openConnection(proxyNode, profile); + return new Connection() { + @Override + public DiscoveryNode getNode() { + return node; + } + + @Override + public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options) + throws IOException, TransportException { + connection.sendRequest(requestId, action, request, options); + } + + @Override + public void addCloseListener(ActionListener listener) { + connection.addCloseListener(listener); + } + + @Override + public boolean isClosed() { + return connection.isClosed(); + } + + @Override + public void close() { + connection.close(); + } + }; + } + }; + } } diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java index f1929e72d8b33..7dd70ff68d386 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java @@ -26,6 +26,7 @@ import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.AbstractScopedSettings; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; @@ -55,6 +56,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; import java.util.function.Predicate; +import java.util.stream.Collectors; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.instanceOf; @@ -116,21 +118,21 @@ public void testRemoteClusterSeedSetting() { } public void testBuiltRemoteClustersSeeds() throws Exception { - Map>> map = RemoteClusterService.buildRemoteClustersSeeds( + Map>>> map = RemoteClusterService.buildRemoteClustersSeeds( Settings.builder().put("search.remote.foo.seeds", "192.168.0.1:8080").put("search.remote.bar.seeds", "[::1]:9090").build()); assertEquals(2, map.size()); assertTrue(map.containsKey("foo")); assertTrue(map.containsKey("bar")); - assertEquals(1, map.get("foo").size()); - assertEquals(1, map.get("bar").size()); + assertEquals(1, map.get("foo").v2().size()); + assertEquals(1, map.get("bar").v2().size()); - DiscoveryNode foo = map.get("foo").get(0).get(); + DiscoveryNode foo = map.get("foo").v2().get(0).get(); assertEquals(foo.getAddress(), new TransportAddress(new InetSocketAddress(InetAddress.getByName("192.168.0.1"), 8080))); assertEquals(foo.getId(), "foo#192.168.0.1:8080"); assertEquals(foo.getVersion(), Version.CURRENT.minimumCompatibilityVersion()); - DiscoveryNode bar = map.get("bar").get(0).get(); + DiscoveryNode bar = map.get("bar").v2().get(0).get(); assertEquals(bar.getAddress(), new TransportAddress(new InetSocketAddress(InetAddress.getByName("[::1]"), 9090))); assertEquals(bar.getId(), "bar#[::1]:9090"); assertEquals(bar.getVersion(), Version.CURRENT.minimumCompatibilityVersion()); @@ -204,17 +206,17 @@ public void testIncrementallyAddClusters() throws IOException { assertFalse(service.isCrossClusterSearchEnabled()); service.initializeRemoteClusters(); assertFalse(service.isCrossClusterSearchEnabled()); - service.updateRemoteCluster("cluster_1", Collections.singletonList(seedNode.getAddress().toString())); + service.updateRemoteCluster("cluster_1", Collections.singletonList(seedNode.getAddress().toString()), null); assertTrue(service.isCrossClusterSearchEnabled()); assertTrue(service.isRemoteClusterRegistered("cluster_1")); - service.updateRemoteCluster("cluster_2", Collections.singletonList(otherSeedNode.getAddress().toString())); + service.updateRemoteCluster("cluster_2", Collections.singletonList(otherSeedNode.getAddress().toString()), null); assertTrue(service.isCrossClusterSearchEnabled()); assertTrue(service.isRemoteClusterRegistered("cluster_1")); assertTrue(service.isRemoteClusterRegistered("cluster_2")); - service.updateRemoteCluster("cluster_2", Collections.emptyList()); + service.updateRemoteCluster("cluster_2", Collections.emptyList(), null); assertFalse(service.isRemoteClusterRegistered("cluster_2")); IllegalArgumentException iae = expectThrows(IllegalArgumentException.class, - () -> service.updateRemoteCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, Collections.emptyList())); + () -> service.updateRemoteCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, Collections.emptyList(), null)); assertEquals("remote clusters must not have the empty string as its key", iae.getMessage()); } } @@ -265,14 +267,14 @@ public void testRemoteNodeAttribute() throws IOException, InterruptedException { final CountDownLatch firstLatch = new CountDownLatch(1); service.updateRemoteCluster( "cluster_1", - Arrays.asList(c1N1Node.getAddress().toString(), c1N2Node.getAddress().toString()), + Arrays.asList(c1N1Node.getAddress().toString(), c1N2Node.getAddress().toString()), null, connectionListener(firstLatch)); firstLatch.await(); final CountDownLatch secondLatch = new CountDownLatch(1); service.updateRemoteCluster( "cluster_2", - Arrays.asList(c2N1Node.getAddress().toString(), c2N2Node.getAddress().toString()), + Arrays.asList(c2N1Node.getAddress().toString(), c2N2Node.getAddress().toString()), null, connectionListener(secondLatch)); secondLatch.await(); @@ -330,14 +332,14 @@ public void testRemoteNodeRoles() throws IOException, InterruptedException { final CountDownLatch firstLatch = new CountDownLatch(1); service.updateRemoteCluster( "cluster_1", - Arrays.asList(c1N1Node.getAddress().toString(), c1N2Node.getAddress().toString()), + Arrays.asList(c1N1Node.getAddress().toString(), c1N2Node.getAddress().toString()), null, connectionListener(firstLatch)); firstLatch.await(); final CountDownLatch secondLatch = new CountDownLatch(1); service.updateRemoteCluster( "cluster_2", - Arrays.asList(c2N1Node.getAddress().toString(), c2N2Node.getAddress().toString()), + Arrays.asList(c2N1Node.getAddress().toString(), c2N2Node.getAddress().toString()), null, connectionListener(secondLatch)); secondLatch.await(); @@ -403,14 +405,14 @@ public void testCollectNodes() throws InterruptedException, IOException { final CountDownLatch firstLatch = new CountDownLatch(1); service.updateRemoteCluster( "cluster_1", - Arrays.asList(c1N1Node.getAddress().toString(), c1N2Node.getAddress().toString()), + Arrays.asList(c1N1Node.getAddress().toString(), c1N2Node.getAddress().toString()), null, connectionListener(firstLatch)); firstLatch.await(); final CountDownLatch secondLatch = new CountDownLatch(1); service.updateRemoteCluster( "cluster_2", - Arrays.asList(c2N1Node.getAddress().toString(), c2N2Node.getAddress().toString()), + Arrays.asList(c2N1Node.getAddress().toString(), c2N2Node.getAddress().toString()), null, connectionListener(secondLatch)); secondLatch.await(); CountDownLatch latch = new CountDownLatch(1); @@ -827,4 +829,76 @@ public void testGetNodePredicatesCombination() { assertFalse(nodePredicate.test(node)); } } + + public void testRemoteClusterWithProxy() throws Exception { + List knownNodes = new CopyOnWriteArrayList<>(); + try (MockTransportService cluster_1_node0 = startTransport("cluster_1_node0", knownNodes, Version.CURRENT); + MockTransportService cluster_1_node_1 = startTransport("cluster_1_node1", knownNodes, Version.CURRENT); + MockTransportService cluster_2_node0 = startTransport("cluster_2_node0", Collections.emptyList(), Version.CURRENT)) { + knownNodes.add(cluster_1_node0.getLocalDiscoNode()); + knownNodes.add(cluster_1_node_1.getLocalDiscoNode()); + String cluster1Proxy = "1.1.1.1:99"; + String cluster2Proxy = "2.2.2.2:99"; + Map nodesCluster1 = new HashMap<>(); + nodesCluster1.put("cluster_1_node0", cluster_1_node0.getLocalDiscoNode()); + nodesCluster1.put("cluster_1_node1", cluster_1_node_1.getLocalDiscoNode()); + Map> mapping = new HashMap<>(); + mapping.put(cluster1Proxy, nodesCluster1); + mapping.put(cluster2Proxy, Collections.singletonMap("cluster_2_node0", cluster_2_node0.getLocalDiscoNode())); + + Collections.shuffle(knownNodes, random()); + Transport proxyTransport = RemoteClusterConnectionTests.getProxyTransport(threadPool, mapping); + try (MockTransportService transportService = MockTransportService.createNewService(Settings.EMPTY, proxyTransport, + Version.CURRENT, threadPool, null, Collections.emptySet());) { + transportService.start(); + transportService.acceptIncomingRequests(); + Settings.Builder builder = Settings.builder(); + builder.putList("search.remote.cluster_1.seeds", "cluster_1_node0:8080"); + builder.put("search.remote.cluster_1.proxy", cluster1Proxy); + try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) { + assertFalse(service.isCrossClusterSearchEnabled()); + service.initializeRemoteClusters(); + assertTrue(service.isCrossClusterSearchEnabled()); + updateRemoteCluster(service, "cluster_1", Collections.singletonList("cluster_1_node1:8081"), cluster1Proxy); + assertTrue(service.isCrossClusterSearchEnabled()); + assertTrue(service.isRemoteClusterRegistered("cluster_1")); + assertFalse(service.isRemoteClusterRegistered("cluster_2")); + updateRemoteCluster(service, "cluster_2", Collections.singletonList("cluster_2_node0:9300"), cluster2Proxy); + assertTrue(service.isCrossClusterSearchEnabled()); + assertTrue(service.isRemoteClusterRegistered("cluster_1")); + assertTrue(service.isRemoteClusterRegistered("cluster_2")); + List infos = service.getRemoteConnectionInfos().collect(Collectors.toList()); + for (RemoteConnectionInfo info : infos) { + switch (info.clusterAlias) { + case "cluster_1": + assertEquals(2, info.numNodesConnected); + break; + case "cluster_2": + assertEquals(1, info.numNodesConnected); + break; + default: + fail("unknown cluster: " + info.clusterAlias); + } + } + service.updateRemoteCluster("cluster_2", Collections.emptyList(), randomBoolean() ? cluster2Proxy : null); + assertFalse(service.isRemoteClusterRegistered("cluster_2")); + } + } + } + } + + private void updateRemoteCluster(RemoteClusterService service, String clusterAlias, List addresses, String proxyAddress) + throws Exception { + CountDownLatch latch = new CountDownLatch(1); + AtomicReference exceptionAtomicReference = new AtomicReference<>(); + ActionListener listener = ActionListener.wrap(x -> latch.countDown(), x -> { + exceptionAtomicReference.set(x); + latch.countDown(); + }); + service.updateRemoteCluster(clusterAlias, addresses, proxyAddress, listener); + latch.await(); + if (exceptionAtomicReference.get() != null) { + throw exceptionAtomicReference.get(); + } + } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java index 15ab06d651e92..d6c4f30a885d5 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java @@ -95,6 +95,12 @@ public List> getSettings() { public static MockTransportService createNewService(Settings settings, Version version, ThreadPool threadPool, @Nullable ClusterSettings clusterSettings) { + MockTcpTransport mockTcpTransport = newMockTransport(settings, version, threadPool); + return createNewService(settings, mockTcpTransport, version, threadPool, clusterSettings, + Collections.emptySet()); + } + + public static MockTcpTransport newMockTransport(Settings settings, Version version, ThreadPool threadPool) { // some tests use MockTransportService to do network based testing. Yet, we run tests in multiple JVMs that means // concurrent tests could claim port that another JVM just released and if that test tries to simulate a disconnect it might // be smart enough to re-connect depending on what is tested. To reduce the risk, since this is very hard to debug we use @@ -102,9 +108,8 @@ public static MockTransportService createNewService(Settings settings, Version v int basePort = 10300 + (JVM_ORDINAL * 100); // use a non-default port otherwise some cluster in this JVM might reuse a port settings = Settings.builder().put(TcpTransport.PORT.getKey(), basePort + "-" + (basePort + 100)).put(settings).build(); NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(ClusterModule.getNamedWriteables()); - final Transport transport = new MockTcpTransport(settings, threadPool, BigArrays.NON_RECYCLING_INSTANCE, + return new MockTcpTransport(settings, threadPool, BigArrays.NON_RECYCLING_INSTANCE, new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Collections.emptyList()), version); - return createNewService(settings, transport, version, threadPool, clusterSettings, Collections.emptySet()); } public static MockTransportService createNewService(Settings settings, Transport transport, Version version, ThreadPool threadPool, diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java index 3cf2034cc74b8..82e646b0ab23a 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java @@ -428,7 +428,7 @@ protected Set getRemoteClusterNames() { } @Override - protected void updateRemoteCluster(String clusterAlias, List addresses) { + protected void updateRemoteCluster(String clusterAlias, List addresses, String proxyAddress) { if (addresses.isEmpty()) { clusters.remove(clusterAlias); } else { From 074aa862ab03a47bdcf59567a280c1d3354916bc Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Thu, 23 Aug 2018 10:34:01 +0200 Subject: [PATCH 2/3] fix imports --- .../org/elasticsearch/transport/RemoteClusterConnection.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java index 9076f90b1fbdf..6b1909434655f 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java @@ -35,7 +35,6 @@ import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; From 6e75318e82d02fcb442ac97c3ce6d508bb9b75e7 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Fri, 24 Aug 2018 10:31:19 +0200 Subject: [PATCH 3/3] Apply feedback --- .../transport/RemoteClusterAware.java | 9 ++++++-- .../transport/RemoteClusterService.java | 2 +- .../RemoteClusterConnectionTests.java | 15 ++++++------ .../transport/RemoteClusterServiceTests.java | 23 +++++++++++++++---- .../test/transport/StubbableTransport.java | 8 ++++++- .../authz/IndicesAndAliasesResolver.java | 2 +- 6 files changed, 41 insertions(+), 18 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java index 8d5a9bb523c0e..16d3c292bfe32 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java @@ -97,7 +97,12 @@ protected RemoteClusterAware(Settings settings) { this.clusterNameResolver = new ClusterNameExpressionResolver(settings); } - protected static Map>>> buildRemoteClustersSeeds(Settings settings) { + /** + * Builds the dynamic per-cluster config from the given settings. This is a map keyed by the cluster alias that points to a tuple + * (ProxyAddresss, [SeedNodeSuppliers]). If a cluster is configured with a proxy address all seed nodes will point to + * {@link TransportAddress#META_ADDRESS} and their configured address will be used as the hostname for the generated discovery node. + */ + protected static Map>>> buildRemoteClustersDynamicConfig(Settings settings) { Stream>> allConcreteSettings = REMOTE_CLUSTERS_SEEDS.getAllConcreteSettings(settings); return allConcreteSettings.collect( Collectors.toMap(REMOTE_CLUSTERS_SEEDS::getNamespace, concreteSetting -> { @@ -112,7 +117,7 @@ protected static Map>>> build })); } - public static DiscoveryNode buildSeedNode(String clusterName, String address, boolean proxyMode) { + static DiscoveryNode buildSeedNode(String clusterName, String address, boolean proxyMode) { if (proxyMode) { TransportAddress transportAddress = new TransportAddress(TransportAddress.META_ADDRESS, 0); String hostName = address.substring(0, indexOfPortSeparator(address)); diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index 9a80d6b81bb0c..60126847cbea9 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -339,7 +339,7 @@ void updateRemoteCluster( void initializeRemoteClusters() { final TimeValue timeValue = REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings); final PlainActionFuture future = new PlainActionFuture<>(); - Map>>> seeds = RemoteClusterAware.buildRemoteClustersSeeds(settings); + Map>>> seeds = RemoteClusterAware.buildRemoteClustersDynamicConfig(settings); updateRemoteClusters(seeds, future); try { future.get(timeValue.millis(), TimeUnit.MILLISECONDS); diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java index 0ecc8705a47a1..88b01c66898a0 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java @@ -1437,10 +1437,9 @@ public static Transport getProxyTransport(ThreadPool threadPool, Map { Map proxyMapping = nodeMap.get(node.getAddress().toString()); if (proxyMapping == null) { throw new IllegalStateException("no proxy mapping for node: " + node); @@ -1454,8 +1453,8 @@ public Connection openConnection(final DiscoveryNode node, ConnectionProfile pro // route by seed hostname proxyNode = proxyMapping.get(node.getHostName()); } - Connection connection = super.openConnection(proxyNode, profile); - return new Connection() { + Transport.Connection connection = t.openConnection(proxyNode, profile); + return new Transport.Connection() { @Override public DiscoveryNode getNode() { return node; @@ -1482,7 +1481,7 @@ public void close() { connection.close(); } }; - } - }; + }); + return stubbableTransport; } } diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java index 4c56d129429b5..9d42b4e458dbe 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java @@ -117,17 +117,22 @@ public void testRemoteClusterSeedSetting() { assertEquals("failed to parse port", e.getMessage()); } - public void testBuiltRemoteClustersSeeds() throws Exception { - Map>>> map = RemoteClusterService.buildRemoteClustersSeeds( - Settings.builder().put("search.remote.foo.seeds", "192.168.0.1:8080").put("search.remote.bar.seeds", "[::1]:9090").build()); - assertEquals(2, map.size()); + public void testBuildRemoteClustersDynamicConfig() throws Exception { + Map>>> map = RemoteClusterService.buildRemoteClustersDynamicConfig( + Settings.builder().put("search.remote.foo.seeds", "192.168.0.1:8080") + .put("search.remote.bar.seeds", "[::1]:9090") + .put("search.remote.boom.seeds", "boom-node1.internal:1000") + .put("search.remote.boom.proxy", "foo.bar.com:1234").build()); + assertEquals(3, map.size()); assertTrue(map.containsKey("foo")); assertTrue(map.containsKey("bar")); + assertTrue(map.containsKey("boom")); assertEquals(1, map.get("foo").v2().size()); assertEquals(1, map.get("bar").v2().size()); + assertEquals(1, map.get("boom").v2().size()); DiscoveryNode foo = map.get("foo").v2().get(0).get(); - + assertEquals("", map.get("foo").v1()); assertEquals(foo.getAddress(), new TransportAddress(new InetSocketAddress(InetAddress.getByName("192.168.0.1"), 8080))); assertEquals(foo.getId(), "foo#192.168.0.1:8080"); assertEquals(foo.getVersion(), Version.CURRENT.minimumCompatibilityVersion()); @@ -135,7 +140,15 @@ public void testBuiltRemoteClustersSeeds() throws Exception { DiscoveryNode bar = map.get("bar").v2().get(0).get(); assertEquals(bar.getAddress(), new TransportAddress(new InetSocketAddress(InetAddress.getByName("[::1]"), 9090))); assertEquals(bar.getId(), "bar#[::1]:9090"); + assertEquals("", map.get("bar").v1()); assertEquals(bar.getVersion(), Version.CURRENT.minimumCompatibilityVersion()); + + DiscoveryNode boom = map.get("boom").v2().get(0).get(); + assertEquals(boom.getAddress(), new TransportAddress(TransportAddress.META_ADDRESS, 0)); + assertEquals("boom-node1.internal", boom.getHostName()); + assertEquals(boom.getId(), "boom#boom-node1.internal:1000"); + assertEquals("foo.bar.com:1234", map.get("boom").v1()); + assertEquals(boom.getVersion(), Version.CURRENT.minimumCompatibilityVersion()); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableTransport.java b/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableTransport.java index 2e78f8a9a4f04..d35fe609c0855 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableTransport.java @@ -41,7 +41,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -public class StubbableTransport implements Transport { +public final class StubbableTransport implements Transport { private final ConcurrentHashMap sendBehaviors = new ConcurrentHashMap<>(); private final ConcurrentHashMap connectBehaviors = new ConcurrentHashMap<>(); @@ -60,6 +60,12 @@ boolean setDefaultSendBehavior(SendRequestBehavior sendBehavior) { return prior == null; } + public boolean setDefaultConnectBehavior(OpenConnectionBehavior openConnectionBehavior) { + OpenConnectionBehavior prior = this.defaultConnectBehavior; + this.defaultConnectBehavior = openConnectionBehavior; + return prior == null; + } + boolean addSendBehavior(TransportAddress transportAddress, SendRequestBehavior sendBehavior) { return sendBehaviors.put(transportAddress, sendBehavior) == null; } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java index 82e646b0ab23a..34aed55bb2903 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java @@ -418,7 +418,7 @@ private static class RemoteClusterResolver extends RemoteClusterAware { private RemoteClusterResolver(Settings settings, ClusterSettings clusterSettings) { super(settings); - clusters = new CopyOnWriteArraySet<>(buildRemoteClustersSeeds(settings).keySet()); + clusters = new CopyOnWriteArraySet<>(buildRemoteClustersDynamicConfig(settings).keySet()); listenForUpdates(clusterSettings); }