diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java index 107a4b32d89f9..a12f27c93e3c4 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.transport; +import java.util.function.Supplier; import org.elasticsearch.Version; import org.elasticsearch.cluster.metadata.ClusterNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -48,9 +49,20 @@ public abstract class RemoteClusterAware extends AbstractComponent { /** * A list of initial seed nodes to discover eligible nodes from the remote cluster */ - public static final Setting.AffixSetting> REMOTE_CLUSTERS_SEEDS = Setting.affixKeySetting("search.remote.", - "seeds", (key) -> Setting.listSetting(key, Collections.emptyList(), RemoteClusterAware::parseSeedAddress, - Setting.Property.NodeScope, Setting.Property.Dynamic)); + public static final Setting.AffixSetting> REMOTE_CLUSTERS_SEEDS = Setting.affixKeySetting( + "search.remote.", + "seeds", + key -> Setting.listSetting( + key, Collections.emptyList(), + s -> { + // validate seed address + parsePort(s); + return s; + }, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ) + ); public static final char REMOTE_CLUSTER_INDEX_SEPARATOR = ':'; public static final String LOCAL_CLUSTER_GROUP_KEY = ""; @@ -65,18 +77,20 @@ protected RemoteClusterAware(Settings settings) { this.clusterNameResolver = new ClusterNameExpressionResolver(settings); } - protected static Map> buildRemoteClustersSeeds(Settings settings) { - Stream>> allConcreteSettings = REMOTE_CLUSTERS_SEEDS.getAllConcreteSettings(settings); + protected static Map>> buildRemoteClustersSeeds(Settings settings) { + Stream>> allConcreteSettings = REMOTE_CLUSTERS_SEEDS.getAllConcreteSettings(settings); return allConcreteSettings.collect( Collectors.toMap(REMOTE_CLUSTERS_SEEDS::getNamespace, concreteSetting -> { String clusterName = REMOTE_CLUSTERS_SEEDS.getNamespace(concreteSetting); - List nodes = new ArrayList<>(); - for (InetSocketAddress address : concreteSetting.get(settings)) { - TransportAddress transportAddress = new TransportAddress(address); - DiscoveryNode node = new DiscoveryNode(clusterName + "#" + transportAddress.toString(), - transportAddress, - Version.CURRENT.minimumCompatibilityVersion()); - nodes.add(node); + List addresses = concreteSetting.get(settings); + List> nodes = new ArrayList<>(addresses.size()); + for (String address : addresses) { + nodes.add(() -> { + TransportAddress transportAddress = new TransportAddress(RemoteClusterAware.parseSeedAddress(address)); + return new DiscoveryNode(clusterName + "#" + transportAddress.toString(), + transportAddress, + Version.CURRENT.minimumCompatibilityVersion()); + }); } return nodes; })); @@ -128,7 +142,7 @@ public Map> groupClusterIndices(String[] requestIndices, Pr * Subclasses must implement this to receive information about updated cluster aliases. If the given address list is * empty the cluster alias is unregistered and should be removed. */ - protected abstract void updateRemoteCluster(String clusterAlias, List addresses); + protected abstract void updateRemoteCluster(String clusterAlias, List addresses); /** * Registers this instance to listen to updates on the cluster settings. @@ -138,27 +152,35 @@ public void listenForUpdates(ClusterSettings clusterSettings) { (namespace, value) -> {}); } - private static InetSocketAddress parseSeedAddress(String remoteHost) { - int portSeparator = remoteHost.lastIndexOf(':'); // in case we have a IPv6 address ie. [::1]:9300 - if (portSeparator == -1 || portSeparator == remoteHost.length()) { - throw new IllegalArgumentException("remote hosts need to be configured as [host:port], found [" + remoteHost + "] instead"); - } - String host = remoteHost.substring(0, portSeparator); + protected static InetSocketAddress parseSeedAddress(String remoteHost) { + String host = remoteHost.substring(0, indexOfPortSeparator(remoteHost)); InetAddress hostAddress; try { hostAddress = InetAddress.getByName(host); } catch (UnknownHostException e) { throw new IllegalArgumentException("unknown host [" + host + "]", e); } + return new InetSocketAddress(hostAddress, parsePort(remoteHost)); + } + + private static int parsePort(String remoteHost) { try { - int port = Integer.valueOf(remoteHost.substring(portSeparator + 1)); + int port = Integer.valueOf(remoteHost.substring(indexOfPortSeparator(remoteHost) + 1)); if (port <= 0) { throw new IllegalArgumentException("port number must be > 0 but was: [" + port + "]"); } - return new InetSocketAddress(hostAddress, port); + return port; } catch (NumberFormatException e) { - throw new IllegalArgumentException("port must be a number", e); + throw new IllegalArgumentException("failed to parse port", e); + } + } + + private static int indexOfPortSeparator(String remoteHost) { + int portSeparator = remoteHost.lastIndexOf(':'); // in case we have a IPv6 address ie. [::1]:9300 + if (portSeparator == -1 || portSeparator == remoteHost.length()) { + throw new IllegalArgumentException("remote hosts need to be configured as [host:port], found [" + remoteHost + "] instead"); } + return portSeparator; } public static String buildRemoteIndexName(String clusterAlias, String indexName) { diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java index 67c0e1a5aa64a..15cf7899dc03f 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.transport; +import java.util.function.Supplier; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.util.SetOnce; @@ -84,7 +85,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo private final String clusterAlias; private final int maxNumRemoteConnections; private final Predicate nodePredicate; - private volatile List seedNodes; + private volatile List> seedNodes; private volatile boolean skipUnavailable; private final ConnectHandler connectHandler; private SetOnce remoteClusterName = new SetOnce<>(); @@ -99,7 +100,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo * @param maxNumRemoteConnections the maximum number of connections to the remote cluster * @param nodePredicate a predicate to filter eligible remote nodes to connect to */ - RemoteClusterConnection(Settings settings, String clusterAlias, List seedNodes, + RemoteClusterConnection(Settings settings, String clusterAlias, List> seedNodes, TransportService transportService, int maxNumRemoteConnections, Predicate nodePredicate) { super(settings); this.localClusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings); @@ -127,7 +128,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo /** * Updates the list of seed nodes for this cluster connection */ - synchronized void updateSeedNodes(List seedNodes, ActionListener connectListener) { + synchronized void updateSeedNodes(List> seedNodes, ActionListener connectListener) { this.seedNodes = Collections.unmodifiableList(new ArrayList<>(seedNodes)); connectHandler.connect(connectListener); } @@ -456,7 +457,7 @@ protected void doRun() { }); } - void collectRemoteNodes(Iterator seedNodes, + private void collectRemoteNodes(Iterator> seedNodes, final TransportService transportService, ActionListener listener) { if (Thread.currentThread().isInterrupted()) { listener.onFailure(new InterruptedException("remote connect thread got interrupted")); @@ -464,7 +465,7 @@ void collectRemoteNodes(Iterator seedNodes, try { if (seedNodes.hasNext()) { cancellableThreads.executeIO(() -> { - final DiscoveryNode seedNode = seedNodes.next(); + final DiscoveryNode seedNode = seedNodes.next().get(); final TransportService.HandshakeResponse handshakeResponse; Transport.Connection connection = transportService.openConnection(seedNode, ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, null, null)); @@ -554,11 +555,11 @@ private class SniffClusterStateResponseHandler implements TransportResponseHandl private final TransportService transportService; private final Transport.Connection connection; private final ActionListener listener; - private final Iterator seedNodes; + private final Iterator> seedNodes; private final CancellableThreads cancellableThreads; SniffClusterStateResponseHandler(TransportService transportService, Transport.Connection connection, - ActionListener listener, Iterator seedNodes, + ActionListener listener, Iterator> seedNodes, CancellableThreads cancellableThreads) { this.transportService = transportService; this.connection = connection; @@ -651,7 +652,7 @@ void addConnectedNode(DiscoveryNode node) { * Get the information about remote nodes to be rendered on {@code _remote/info} requests. */ public RemoteConnectionInfo getConnectionInfo() { - List seedNodeAddresses = seedNodes.stream().map(DiscoveryNode::getAddress).collect(Collectors.toList()); + List seedNodeAddresses = seedNodes.stream().map(node -> node.get().getAddress()).collect(Collectors.toList()); TimeValue initialConnectionTimeout = RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings); return new RemoteConnectionInfo(clusterAlias, seedNodeAddresses, maxNumRemoteConnections, connectedNodes.size(), initialConnectionTimeout, skipUnavailable); diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index a07de63d53734..956a0d94179e0 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.transport; +import java.util.function.Supplier; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.OriginalIndices; @@ -40,7 +41,6 @@ import java.io.Closeable; import java.io.IOException; -import java.net.InetSocketAddress; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -115,7 +115,8 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl * @param seeds a cluster alias to discovery node mapping representing the remote clusters seeds nodes * @param connectionListener a listener invoked once every configured cluster has been connected to */ - private synchronized void updateRemoteClusters(Map> seeds, ActionListener connectionListener) { + private synchronized void updateRemoteClusters(Map>> seeds, + ActionListener connectionListener) { if (seeds.containsKey(LOCAL_CLUSTER_GROUP_KEY)) { throw new IllegalArgumentException("remote clusters must not have the empty string as its key"); } @@ -125,7 +126,7 @@ private synchronized void updateRemoteClusters(Map> } else { CountDown countDown = new CountDown(seeds.size()); remoteClusters.putAll(this.remoteClusters); - for (Map.Entry> entry : seeds.entrySet()) { + for (Map.Entry>> entry : seeds.entrySet()) { RemoteClusterConnection remote = this.remoteClusters.get(entry.getKey()); if (entry.getValue().isEmpty()) { // with no seed nodes we just remove the connection try { @@ -310,16 +311,17 @@ synchronized void updateSkipUnavailable(String clusterAlias, Boolean skipUnavail } } - protected void updateRemoteCluster(String clusterAlias, List addresses) { + @Override + protected void updateRemoteCluster(String clusterAlias, List addresses) { updateRemoteCluster(clusterAlias, addresses, ActionListener.wrap((x) -> {}, (x) -> {})); } void updateRemoteCluster( final String clusterAlias, - final List addresses, + final List addresses, final ActionListener connectionListener) { - final List nodes = addresses.stream().map(address -> { - final TransportAddress transportAddress = new TransportAddress(address); + final List> nodes = addresses.stream().>map(address -> () -> { + final TransportAddress transportAddress = new TransportAddress(RemoteClusterAware.parseSeedAddress(address)); final String id = clusterAlias + "#" + transportAddress.toString(); final Version version = Version.CURRENT.minimumCompatibilityVersion(); return new DiscoveryNode(id, transportAddress, version); @@ -334,7 +336,7 @@ void updateRemoteCluster( void initializeRemoteClusters() { final TimeValue timeValue = REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings); final PlainActionFuture future = new PlainActionFuture<>(); - Map> seeds = RemoteClusterAware.buildRemoteClustersSeeds(settings); + Map>> seeds = RemoteClusterAware.buildRemoteClustersSeeds(settings); updateRemoteClusters(seeds, future); try { future.get(timeValue.millis(), TimeUnit.MILLISECONDS); diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java index 8da4064d1c89a..3d0388ccfad96 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.transport; +import java.util.function.Supplier; import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; @@ -158,8 +159,8 @@ public void testLocalProfileIsUsedForLocalCluster() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) { - updateSeedNodes(connection, Arrays.asList(seedNode)); + Arrays.asList(() -> seedNode), service, Integer.MAX_VALUE, n -> true)) { + updateSeedNodes(connection, Arrays.asList(() -> seedNode)); assertTrue(service.nodeConnected(seedNode)); assertTrue(service.nodeConnected(discoverableNode)); assertTrue(connection.assertNoRunningConnections()); @@ -198,8 +199,8 @@ public void testRemoteProfileIsUsedForRemoteCluster() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) { - updateSeedNodes(connection, Arrays.asList(seedNode)); + Arrays.asList(() -> seedNode), service, Integer.MAX_VALUE, n -> true)) { + updateSeedNodes(connection, Arrays.asList(() -> seedNode)); assertTrue(service.nodeConnected(seedNode)); assertTrue(service.nodeConnected(discoverableNode)); assertTrue(connection.assertNoRunningConnections()); @@ -254,8 +255,8 @@ public void testDiscoverSingleNode() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) { - updateSeedNodes(connection, Arrays.asList(seedNode)); + Arrays.asList(() -> seedNode), service, Integer.MAX_VALUE, n -> true)) { + updateSeedNodes(connection, Arrays.asList(() -> seedNode)); assertTrue(service.nodeConnected(seedNode)); assertTrue(service.nodeConnected(discoverableNode)); assertTrue(connection.assertNoRunningConnections()); @@ -276,7 +277,7 @@ public void testDiscoverSingleNodeWithIncompatibleSeed() throws Exception { knownNodes.add(discoverableTransport.getLocalDiscoNode()); knownNodes.add(incompatibleTransport.getLocalDiscoNode()); Collections.shuffle(knownNodes, random()); - List seedNodes = Arrays.asList(incompatibleSeedNode, seedNode); + List> seedNodes = Arrays.asList(() -> incompatibleSeedNode, () -> seedNode); Collections.shuffle(seedNodes, random()); try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) { @@ -310,8 +311,8 @@ public void testNodeDisconnected() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) { - updateSeedNodes(connection, Arrays.asList(seedNode)); + Arrays.asList(() -> seedNode), service, Integer.MAX_VALUE, n -> true)) { + updateSeedNodes(connection, Arrays.asList(() -> seedNode)); assertTrue(service.nodeConnected(seedNode)); assertTrue(service.nodeConnected(discoverableNode)); assertFalse(service.nodeConnected(spareNode)); @@ -359,8 +360,8 @@ public void testFilterDiscoveredNodes() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> n.equals(rejectedNode) == false)) { - updateSeedNodes(connection, Arrays.asList(seedNode)); + Arrays.asList(() -> seedNode), service, Integer.MAX_VALUE, n -> n.equals(rejectedNode) == false)) { + updateSeedNodes(connection, Arrays.asList(() -> seedNode)); if (rejectedNode.equals(seedNode)) { assertFalse(service.nodeConnected(seedNode)); assertTrue(service.nodeConnected(discoverableNode)); @@ -374,7 +375,7 @@ public void testFilterDiscoveredNodes() throws Exception { } } - private void updateSeedNodes(RemoteClusterConnection connection, List seedNodes) throws Exception { + private void updateSeedNodes(RemoteClusterConnection connection, List> seedNodes) throws Exception { CountDownLatch latch = new CountDownLatch(1); AtomicReference exceptionAtomicReference = new AtomicReference<>(); ActionListener listener = ActionListener.wrap(x -> latch.countDown(), x -> { @@ -398,8 +399,8 @@ public void testConnectWithIncompatibleTransports() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) { - expectThrows(Exception.class, () -> updateSeedNodes(connection, Arrays.asList(seedNode))); + Arrays.asList(() -> seedNode), service, Integer.MAX_VALUE, n -> true)) { + expectThrows(Exception.class, () -> updateSeedNodes(connection, Arrays.asList(() -> seedNode))); assertFalse(service.nodeConnected(seedNode)); assertTrue(connection.assertNoRunningConnections()); } @@ -461,7 +462,7 @@ public void close() { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) { + Arrays.asList(() -> seedNode), service, Integer.MAX_VALUE, n -> true)) { connection.addConnectedNode(seedNode); for (DiscoveryNode node : knownNodes) { final Transport.Connection transportConnection = connection.getConnection(node); @@ -504,7 +505,7 @@ public void run() { CountDownLatch listenerCalled = new CountDownLatch(1); AtomicReference exceptionReference = new AtomicReference<>(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) { + Arrays.asList(() -> seedNode), service, Integer.MAX_VALUE, n -> true)) { ActionListener listener = ActionListener.wrap(x -> { listenerCalled.countDown(); fail("expected exception"); @@ -512,7 +513,7 @@ public void run() { exceptionReference.set(x); listenerCalled.countDown(); }); - connection.updateSeedNodes(Arrays.asList(seedNode), listener); + connection.updateSeedNodes(Arrays.asList(() -> seedNode), listener); acceptedLatch.await(); connection.close(); // now close it, this should trigger an interrupt on the socket and we can move on assertTrue(connection.assertNoRunningConnections()); @@ -539,7 +540,7 @@ public void testFetchShards() throws Exception { try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) { service.start(); service.acceptIncomingRequests(); - List nodes = Collections.singletonList(seedNode); + List> nodes = Collections.singletonList(() -> seedNode); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", nodes, service, Integer.MAX_VALUE, n -> true)) { if (randomBoolean()) { @@ -579,7 +580,7 @@ public void testFetchShardsThreadContextHeader() throws Exception { try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) { service.start(); service.acceptIncomingRequests(); - List nodes = Collections.singletonList(seedNode); + List> nodes = Collections.singletonList(() -> seedNode); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", nodes, service, Integer.MAX_VALUE, n -> true)) { SearchRequest request = new SearchRequest("test-index"); @@ -635,7 +636,7 @@ public void testFetchShardsSkipUnavailable() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Collections.singletonList(seedNode), service, Integer.MAX_VALUE, n -> true)) { + Collections.singletonList(() -> seedNode), service, Integer.MAX_VALUE, n -> true)) { SearchRequest request = new SearchRequest("test-index"); ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest("test-index") @@ -738,7 +739,7 @@ public void testTriggerUpdatesConcurrently() throws IOException, InterruptedExce knownNodes.add(discoverableTransport.getLocalDiscoNode()); knownNodes.add(seedTransport1.getLocalDiscoNode()); Collections.shuffle(knownNodes, random()); - List seedNodes = Arrays.asList(seedNode1, seedNode); + List> seedNodes = Arrays.asList(() -> seedNode1, () -> seedNode); Collections.shuffle(seedNodes, random()); try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) { @@ -816,7 +817,7 @@ public void testCloseWhileConcurrentlyConnecting() throws IOException, Interrupt knownNodes.add(discoverableTransport.getLocalDiscoNode()); knownNodes.add(seedTransport1.getLocalDiscoNode()); Collections.shuffle(knownNodes, random()); - List seedNodes = Arrays.asList(seedNode1, seedNode); + List> seedNodes = Arrays.asList(() -> seedNode1, () -> seedNode); Collections.shuffle(seedNodes, random()); try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) { @@ -904,7 +905,7 @@ public void testGetConnectionInfo() throws Exception { knownNodes.add(transport3.getLocalDiscoNode()); knownNodes.add(transport2.getLocalDiscoNode()); Collections.shuffle(knownNodes, random()); - List seedNodes = Arrays.asList(node3, node1, node2); + List> seedNodes = Arrays.asList(() -> node3, () -> node1, () -> node2); Collections.shuffle(seedNodes, random()); try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) { @@ -1059,7 +1060,7 @@ public void testEnsureConnected() throws IOException, InterruptedException { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) { + Arrays.asList(() -> seedNode), service, Integer.MAX_VALUE, n -> true)) { assertFalse(service.nodeConnected(seedNode)); assertFalse(service.nodeConnected(discoverableNode)); assertTrue(connection.assertNoRunningConnections()); @@ -1108,9 +1109,9 @@ public void testCollectNodes() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) { + Arrays.asList(() -> seedNode), service, Integer.MAX_VALUE, n -> true)) { if (randomBoolean()) { - updateSeedNodes(connection, Arrays.asList(seedNode)); + updateSeedNodes(connection, Arrays.asList(() -> seedNode)); } CountDownLatch responseLatch = new CountDownLatch(1); AtomicReference> reference = new AtomicReference<>(); @@ -1142,14 +1143,14 @@ public void testConnectedNodesConcurrentAccess() throws IOException, Interrupted List discoverableTransports = new CopyOnWriteArrayList<>(); try { final int numDiscoverableNodes = randomIntBetween(5, 20); - List discoverableNodes = new ArrayList<>(numDiscoverableNodes); - for (int i = 0; i < numDiscoverableNodes; i++) { + List> discoverableNodes = new ArrayList<>(numDiscoverableNodes); + for (int i = 0; i < numDiscoverableNodes; i++ ) { MockTransportService transportService = startTransport("discoverable_node" + i, knownNodes, Version.CURRENT); - discoverableNodes.add(transportService.getLocalDiscoNode()); + discoverableNodes.add(transportService::getLocalDiscoNode); discoverableTransports.add(transportService); } - List seedNodes = randomSubsetOf(discoverableNodes); + List> seedNodes = randomSubsetOf(discoverableNodes); Collections.shuffle(seedNodes, random()); try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) { @@ -1198,7 +1199,7 @@ public void testConnectedNodesConcurrentAccess() throws IOException, Interrupted discoverableTransports.add(transportService); connection.addConnectedNode(transportService.getLocalDiscoNode()); } else { - DiscoveryNode node = randomFrom(discoverableNodes); + DiscoveryNode node = randomFrom(discoverableNodes).get(); connection.onNodeDisconnected(node); } } @@ -1246,12 +1247,13 @@ public void testClusterNameIsChecked() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) { - updateSeedNodes(connection, Arrays.asList(seedNode)); + Arrays.asList( () -> seedNode), service, Integer.MAX_VALUE, n -> true)) { + updateSeedNodes(connection, Arrays.asList(() -> seedNode)); assertTrue(service.nodeConnected(seedNode)); assertTrue(service.nodeConnected(discoverableNode)); assertTrue(connection.assertNoRunningConnections()); - List discoveryNodes = Arrays.asList(otherClusterTransport.getLocalDiscoNode(), seedNode); + List> discoveryNodes = + Arrays.asList(() -> otherClusterTransport.getLocalDiscoNode(), () -> seedNode); Collections.shuffle(discoveryNodes, random()); updateSeedNodes(connection, discoveryNodes); assertTrue(service.nodeConnected(seedNode)); @@ -1262,7 +1264,7 @@ public void testClusterNameIsChecked() throws Exception { assertTrue(service.nodeConnected(discoverableNode)); assertTrue(connection.assertNoRunningConnections()); IllegalStateException illegalStateException = expectThrows(IllegalStateException.class, () -> - updateSeedNodes(connection, Arrays.asList(otherClusterTransport.getLocalDiscoNode()))); + updateSeedNodes(connection, Arrays.asList(() -> otherClusterTransport.getLocalDiscoNode()))); assertThat(illegalStateException.getMessage(), startsWith("handshake failed, mismatched cluster name [Cluster [otherCluster]]" + " - {other_cluster_discoverable_node}")); @@ -1325,7 +1327,7 @@ public void close() { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Collections.singletonList(connectedNode), service, Integer.MAX_VALUE, n -> true)) { + Collections.singletonList(() -> connectedNode), service, Integer.MAX_VALUE, n -> true)) { connection.addConnectedNode(connectedNode); for (int i = 0; i < 10; i++) { //always a direct connection as the remote node is already connected @@ -1348,4 +1350,34 @@ public void close() { } } } + + public void testLazyResolveTransportAddress() throws Exception { + List knownNodes = new CopyOnWriteArrayList<>(); + try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT); + MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT)) { + DiscoveryNode seedNode = seedTransport.getLocalDiscoNode(); + knownNodes.add(seedTransport.getLocalDiscoNode()); + knownNodes.add(discoverableTransport.getLocalDiscoNode()); + Collections.shuffle(knownNodes, random()); + + try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) { + service.start(); + service.acceptIncomingRequests(); + CountDownLatch multipleResolveLatch = new CountDownLatch(2); + Supplier seedSupplier = () -> { + multipleResolveLatch.countDown(); + return seedNode; + }; + try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", + Arrays.asList(seedSupplier), service, Integer.MAX_VALUE, n -> true)) { + updateSeedNodes(connection, Arrays.asList(seedSupplier)); + // Closing connections leads to RemoteClusterConnection.ConnectHandler.collectRemoteNodes + // being called again so we try to resolve the same seed node's host twice + discoverableTransport.close(); + seedTransport.close(); + assertTrue(multipleResolveLatch.await(30L, TimeUnit.SECONDS)); + } + } + } + } } diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java index 03d76b5a953c6..c94b1cbdef547 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.transport; +import java.util.function.Supplier; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.LatchedActionListener; @@ -103,10 +104,19 @@ public void testRemoteClusterSeedSetting() { .put("search.remote.foo.seeds", "192.168.0.1").build(); expectThrows(IllegalArgumentException.class, () -> RemoteClusterAware.REMOTE_CLUSTERS_SEEDS.getAllConcreteSettings(brokenSettings).forEach(setting -> setting.get(brokenSettings))); + + Settings brokenPortSettings = Settings.builder() + .put("search.remote.foo.seeds", "192.168.0.1:123456789123456789").build(); + Exception e = expectThrows( + IllegalArgumentException.class, + () -> RemoteClusterAware.REMOTE_CLUSTERS_SEEDS.getAllConcreteSettings(brokenSettings) + .forEach(setting -> setting.get(brokenPortSettings)) + ); + assertEquals("failed to parse port", e.getMessage()); } public void testBuiltRemoteClustersSeeds() throws Exception { - Map> map = RemoteClusterService.buildRemoteClustersSeeds( + Map>> map = RemoteClusterService.buildRemoteClustersSeeds( Settings.builder().put("search.remote.foo.seeds", "192.168.0.1:8080").put("search.remote.bar.seeds", "[::1]:9090").build()); assertEquals(2, map.size()); assertTrue(map.containsKey("foo")); @@ -114,13 +124,13 @@ public void testBuiltRemoteClustersSeeds() throws Exception { assertEquals(1, map.get("foo").size()); assertEquals(1, map.get("bar").size()); - DiscoveryNode foo = map.get("foo").get(0); + DiscoveryNode foo = map.get("foo").get(0).get(); assertEquals(foo.getAddress(), new TransportAddress(new InetSocketAddress(InetAddress.getByName("192.168.0.1"), 8080))); assertEquals(foo.getId(), "foo#192.168.0.1:8080"); assertEquals(foo.getVersion(), Version.CURRENT.minimumCompatibilityVersion()); - DiscoveryNode bar = map.get("bar").get(0); + DiscoveryNode bar = map.get("bar").get(0).get(); assertEquals(bar.getAddress(), new TransportAddress(new InetSocketAddress(InetAddress.getByName("[::1]"), 9090))); assertEquals(bar.getId(), "bar#[::1]:9090"); assertEquals(bar.getVersion(), Version.CURRENT.minimumCompatibilityVersion()); @@ -194,10 +204,10 @@ public void testIncrementallyAddClusters() throws IOException { assertFalse(service.isCrossClusterSearchEnabled()); service.initializeRemoteClusters(); assertFalse(service.isCrossClusterSearchEnabled()); - service.updateRemoteCluster("cluster_1", Collections.singletonList(seedNode.getAddress().address())); + service.updateRemoteCluster("cluster_1", Collections.singletonList(seedNode.getAddress().toString())); assertTrue(service.isCrossClusterSearchEnabled()); assertTrue(service.isRemoteClusterRegistered("cluster_1")); - service.updateRemoteCluster("cluster_2", Collections.singletonList(otherSeedNode.getAddress().address())); + service.updateRemoteCluster("cluster_2", Collections.singletonList(otherSeedNode.getAddress().toString())); assertTrue(service.isCrossClusterSearchEnabled()); assertTrue(service.isRemoteClusterRegistered("cluster_1")); assertTrue(service.isRemoteClusterRegistered("cluster_2")); @@ -252,22 +262,17 @@ public void testRemoteNodeAttribute() throws IOException, InterruptedException { service.initializeRemoteClusters(); assertFalse(service.isCrossClusterSearchEnabled()); - final InetSocketAddress c1N1Address = c1N1Node.getAddress().address(); - final InetSocketAddress c1N2Address = c1N2Node.getAddress().address(); - final InetSocketAddress c2N1Address = c2N1Node.getAddress().address(); - final InetSocketAddress c2N2Address = c2N2Node.getAddress().address(); - final CountDownLatch firstLatch = new CountDownLatch(1); service.updateRemoteCluster( "cluster_1", - Arrays.asList(c1N1Address, c1N2Address), + Arrays.asList(c1N1Node.getAddress().toString(), c1N2Node.getAddress().toString()), connectionListener(firstLatch)); firstLatch.await(); final CountDownLatch secondLatch = new CountDownLatch(1); service.updateRemoteCluster( "cluster_2", - Arrays.asList(c2N1Address, c2N2Address), + Arrays.asList(c2N1Node.getAddress().toString(), c2N2Node.getAddress().toString()), connectionListener(secondLatch)); secondLatch.await(); @@ -321,22 +326,17 @@ public void testRemoteNodeRoles() throws IOException, InterruptedException { service.initializeRemoteClusters(); assertFalse(service.isCrossClusterSearchEnabled()); - final InetSocketAddress c1N1Address = c1N1Node.getAddress().address(); - final InetSocketAddress c1N2Address = c1N2Node.getAddress().address(); - final InetSocketAddress c2N1Address = c2N1Node.getAddress().address(); - final InetSocketAddress c2N2Address = c2N2Node.getAddress().address(); - final CountDownLatch firstLatch = new CountDownLatch(1); service.updateRemoteCluster( "cluster_1", - Arrays.asList(c1N1Address, c1N2Address), + Arrays.asList(c1N1Node.getAddress().toString(), c1N2Node.getAddress().toString()), connectionListener(firstLatch)); firstLatch.await(); final CountDownLatch secondLatch = new CountDownLatch(1); service.updateRemoteCluster( "cluster_2", - Arrays.asList(c2N1Address, c2N2Address), + Arrays.asList(c2N1Node.getAddress().toString(), c2N2Node.getAddress().toString()), connectionListener(secondLatch)); secondLatch.await(); @@ -398,22 +398,17 @@ public void testCollectNodes() throws InterruptedException, IOException { service.initializeRemoteClusters(); assertFalse(service.isCrossClusterSearchEnabled()); - final InetSocketAddress c1N1Address = c1N1Node.getAddress().address(); - final InetSocketAddress c1N2Address = c1N2Node.getAddress().address(); - final InetSocketAddress c2N1Address = c2N1Node.getAddress().address(); - final InetSocketAddress c2N2Address = c2N2Node.getAddress().address(); - final CountDownLatch firstLatch = new CountDownLatch(1); service.updateRemoteCluster( "cluster_1", - Arrays.asList(c1N1Address, c1N2Address), + Arrays.asList(c1N1Node.getAddress().toString(), c1N2Node.getAddress().toString()), connectionListener(firstLatch)); firstLatch.await(); final CountDownLatch secondLatch = new CountDownLatch(1); service.updateRemoteCluster( "cluster_2", - Arrays.asList(c2N1Address, c2N2Address), + Arrays.asList(c2N1Node.getAddress().toString(), c2N2Node.getAddress().toString()), connectionListener(secondLatch)); secondLatch.await(); CountDownLatch latch = new CountDownLatch(1); diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java index 2247cbe02a812..c388fd5627c32 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java @@ -30,7 +30,6 @@ import org.elasticsearch.xpack.core.graph.action.GraphExploreRequest; import org.elasticsearch.xpack.core.security.authz.IndicesAndAliasesResolverField; -import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -429,7 +428,7 @@ protected Set getRemoteClusterNames() { } @Override - protected void updateRemoteCluster(String clusterAlias, List addresses) { + protected void updateRemoteCluster(String clusterAlias, List addresses) { if (addresses.isEmpty()) { clusters.remove(clusterAlias); } else {