From 5cb9d6847c83e1448f60f0d8df9f57a46ace88e4 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Mon, 16 Apr 2018 16:11:27 -0400 Subject: [PATCH] CCS: Drop http address from remote cluster info They are expensive to fetch and no longer needed by Kibana so they *shouldn't* be needed by anyone else either. Closes #29207 --- docs/reference/cluster/remote-info.asciidoc | 3 - .../release-notes/7.0.0-alpha1.asciidoc | 6 +- .../remote/TransportRemoteInfoAction.java | 5 +- .../cluster/RestRemoteClusterInfoAction.java | 13 +-- .../transport/RemoteClusterConnection.java | 66 ++------------- .../transport/RemoteClusterService.java | 15 +--- .../transport/RemoteConnectionInfo.java | 67 ++++++++------- .../RemoteClusterConnectionTests.java | 83 +++---------------- 8 files changed, 73 insertions(+), 185 deletions(-) diff --git a/docs/reference/cluster/remote-info.asciidoc b/docs/reference/cluster/remote-info.asciidoc index d044f4dcad221..3dfcc201e7ac4 100644 --- a/docs/reference/cluster/remote-info.asciidoc +++ b/docs/reference/cluster/remote-info.asciidoc @@ -19,9 +19,6 @@ the configured remote cluster alias. `seeds`:: The configured initial seed transport addresses of the remote cluster. -`http_addresses`:: - The published http addresses of all connected remote nodes. - `connected`:: True if there is at least one connection to the remote cluster. diff --git a/docs/reference/release-notes/7.0.0-alpha1.asciidoc b/docs/reference/release-notes/7.0.0-alpha1.asciidoc index 128a9b7dd716b..e4cc50062a359 100644 --- a/docs/reference/release-notes/7.0.0-alpha1.asciidoc +++ b/docs/reference/release-notes/7.0.0-alpha1.asciidoc @@ -8,4 +8,8 @@ The changes listed below have been released for the first time in Elasticsearch === Breaking changes Core:: -* Tribe node has been removed in favor of Cross-Cluster-Search \ No newline at end of file +* Tribe node has been removed in favor of Cross-Cluster-Search + +Cross-Cluster-Search:: +* `http_addresses` has been removed from the <> API + because it is expensive to fetch and no longer needed by Kibana. diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/remote/TransportRemoteInfoAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/remote/TransportRemoteInfoAction.java index 0410f920c8a9a..36974633559b6 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/remote/TransportRemoteInfoAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/remote/TransportRemoteInfoAction.java @@ -30,6 +30,8 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import static java.util.stream.Collectors.toList; + public final class TransportRemoteInfoAction extends HandledTransportAction { private final RemoteClusterService remoteClusterService; @@ -45,7 +47,6 @@ public TransportRemoteInfoAction(Settings settings, ThreadPool threadPool, Trans @Override protected void doExecute(RemoteInfoRequest remoteInfoRequest, ActionListener listener) { - remoteClusterService.getRemoteConnectionInfos(ActionListener.wrap(remoteConnectionInfos - -> listener.onResponse(new RemoteInfoResponse(remoteConnectionInfos)), listener::onFailure)); + listener.onResponse(new RemoteInfoResponse(remoteClusterService.getRemoteConnectionInfos().collect(toList()))); } } diff --git a/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestRemoteClusterInfoAction.java b/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestRemoteClusterInfoAction.java index 75baf8cecaaa5..c17be138df19a 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestRemoteClusterInfoAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestRemoteClusterInfoAction.java @@ -32,6 +32,7 @@ import org.elasticsearch.rest.RestResponse; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.action.RestBuilderListener; +import org.elasticsearch.rest.action.RestToXContentListener; import java.io.IOException; @@ -50,16 +51,8 @@ public String getName() { } @Override - public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) - throws IOException { - return channel -> client.execute(RemoteInfoAction.INSTANCE, new RemoteInfoRequest(), - new RestBuilderListener(channel) { - @Override - public RestResponse buildResponse(RemoteInfoResponse response, XContentBuilder builder) throws Exception { - response.toXContent(builder, request); - return new BytesRestResponse(RestStatus.OK, builder); - } - }); + public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { + return channel -> client.execute(RemoteInfoAction.INSTANCE, new RemoteInfoRequest(), new RestToXContentListener<>(channel)); } @Override public boolean canTripCircuitBreaker() { diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java index fb4586d201bd7..f24a1a928d50f 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java @@ -40,6 +40,7 @@ import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.CancellableThreads; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ThreadContext; @@ -602,66 +603,13 @@ void addConnectedNode(DiscoveryNode node) { } /** - * Fetches connection info for this connection + * Get the information about remote nodes to be rendered on {@code _remote/info} requests. */ - public void getConnectionInfo(ActionListener listener) { - final Optional anyNode = connectedNodes.getAny(); - if (anyNode.isPresent() == false) { - // not connected we return immediately - RemoteConnectionInfo remoteConnectionStats = new RemoteConnectionInfo(clusterAlias, - Collections.emptyList(), Collections.emptyList(), maxNumRemoteConnections, 0, - RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings), skipUnavailable); - listener.onResponse(remoteConnectionStats); - } else { - NodesInfoRequest request = new NodesInfoRequest(); - request.clear(); - request.http(true); - - transportService.sendRequest(anyNode.get(), NodesInfoAction.NAME, request, new TransportResponseHandler() { - @Override - public NodesInfoResponse newInstance() { - return new NodesInfoResponse(); - } - - @Override - public void handleResponse(NodesInfoResponse response) { - Collection httpAddresses = new HashSet<>(); - for (NodeInfo info : response.getNodes()) { - if (connectedNodes.contains(info.getNode()) && info.getHttp() != null) { - httpAddresses.add(info.getHttp().getAddress().publishAddress()); - } - } - - if (httpAddresses.size() < maxNumRemoteConnections) { - // just in case non of the connected nodes have http enabled we get other http enabled nodes instead. - for (NodeInfo info : response.getNodes()) { - if (nodePredicate.test(info.getNode()) && info.getHttp() != null) { - httpAddresses.add(info.getHttp().getAddress().publishAddress()); - } - if (httpAddresses.size() == maxNumRemoteConnections) { - break; // once we have enough return... - } - } - } - RemoteConnectionInfo remoteConnectionInfo = new RemoteConnectionInfo(clusterAlias, - seedNodes.stream().map(DiscoveryNode::getAddress).collect(Collectors.toList()), new ArrayList<>(httpAddresses), - maxNumRemoteConnections, connectedNodes.size(), - RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings), skipUnavailable); - listener.onResponse(remoteConnectionInfo); - } - - @Override - public void handleException(TransportException exp) { - listener.onFailure(exp); - } - - @Override - public String executor() { - return ThreadPool.Names.SAME; - } - }); - } - + public RemoteConnectionInfo getConnectionInfo() { + List seedNodeAddresses = seedNodes.stream().map(DiscoveryNode::getAddress).collect(Collectors.toList()); + TimeValue initialConnectionTimeout = RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings); + return new RemoteConnectionInfo(clusterAlias, seedNodeAddresses, maxNumRemoteConnections, connectedNodes.size(), + initialConnectionTimeout, skipUnavailable); } int getNumNodesConnected() { diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index f454571301777..0b4fcf3711fd5 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -42,6 +42,7 @@ import java.io.Closeable; import java.io.IOException; import java.net.InetSocketAddress; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -56,6 +57,7 @@ import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.elasticsearch.common.settings.Setting.boolSetting; @@ -348,17 +350,8 @@ public void close() throws IOException { IOUtils.close(remoteClusters.values()); } - public void getRemoteConnectionInfos(ActionListener> listener) { - final Map remoteClusters = this.remoteClusters; - if (remoteClusters.isEmpty()) { - listener.onResponse(Collections.emptyList()); - } else { - final GroupedActionListener actionListener = new GroupedActionListener<>(listener, - remoteClusters.size(), Collections.emptyList()); - for (RemoteClusterConnection connection : remoteClusters.values()) { - connection.getConnectionInfo(actionListener); - } - } + public Stream getRemoteConnectionInfos() { + return remoteClusters.values().stream().map(RemoteClusterConnection::getConnectionInfo); } /** diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionInfo.java b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionInfo.java index cb51f7edce570..60067e18573ad 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionInfo.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionInfo.java @@ -27,17 +27,18 @@ import org.elasticsearch.common.xcontent.ToXContentFragment; import org.elasticsearch.common.xcontent.XContentBuilder; +import static java.util.Collections.emptyList; + import java.io.IOException; import java.util.List; import java.util.Objects; /** * This class encapsulates all remote cluster information to be rendered on - * _remote/info requests. + * {@code _remote/info} requests. */ public final class RemoteConnectionInfo implements ToXContentFragment, Writeable { final List seedNodes; - final List httpAddresses; final int connectionsPerCluster; final TimeValue initialConnectionTimeout; final int numNodesConnected; @@ -45,12 +46,10 @@ public final class RemoteConnectionInfo implements ToXContentFragment, Writeable final boolean skipUnavailable; RemoteConnectionInfo(String clusterAlias, List seedNodes, - List httpAddresses, int connectionsPerCluster, int numNodesConnected, TimeValue initialConnectionTimeout, boolean skipUnavailable) { this.clusterAlias = clusterAlias; this.seedNodes = seedNodes; - this.httpAddresses = httpAddresses; this.connectionsPerCluster = connectionsPerCluster; this.numNodesConnected = numNodesConnected; this.initialConnectionTimeout = initialConnectionTimeout; @@ -59,16 +58,45 @@ public final class RemoteConnectionInfo implements ToXContentFragment, Writeable public RemoteConnectionInfo(StreamInput input) throws IOException { seedNodes = input.readList(TransportAddress::new); - httpAddresses = input.readList(TransportAddress::new); + if (input.getVersion().before(Version.V_7_0_0_alpha1)) { + /* + * Versions before 7.0 sent the HTTP addresses of all nodes in the + * remote cluster here but it was expensive to fetch and we + * ultimately figured out how to do without it. So we removed it. + * + * We just throw any HTTP addresses received here on the floor + * because we don't need to do anything with them. + */ + input.readList(TransportAddress::new); + } connectionsPerCluster = input.readVInt(); initialConnectionTimeout = input.readTimeValue(); numNodesConnected = input.readVInt(); clusterAlias = input.readString(); - if (input.getVersion().onOrAfter(Version.V_6_1_0)) { - skipUnavailable = input.readBoolean(); - } else { - skipUnavailable = false; + skipUnavailable = input.readBoolean(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeList(seedNodes); + if (out.getVersion().before(Version.V_7_0_0_alpha1)) { + /* + * Versions before 7.0 sent the HTTP addresses of all nodes in the + * remote cluster here but it was expensive to fetch and we + * ultimately figured out how to do without it. So we removed it. + * + * When sending this request to a node that expects HTTP addresses + * here we pretend that we didn't find any. This *should* be fine + * because, after all, we haven't been using this information for + * a while. + */ + out.writeList(emptyList()); } + out.writeVInt(connectionsPerCluster); + out.writeTimeValue(initialConnectionTimeout); + out.writeVInt(numNodesConnected); + out.writeString(clusterAlias); + out.writeBoolean(skipUnavailable); } @Override @@ -80,11 +108,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.value(addr.toString()); } builder.endArray(); - builder.startArray("http_addresses"); - for (TransportAddress addr : httpAddresses) { - builder.value(addr.toString()); - } - builder.endArray(); builder.field("connected", numNodesConnected > 0); builder.field("num_nodes_connected", numNodesConnected); builder.field("max_connections_per_cluster", connectionsPerCluster); @@ -95,19 +118,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws return builder; } - @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeList(seedNodes); - out.writeList(httpAddresses); - out.writeVInt(connectionsPerCluster); - out.writeTimeValue(initialConnectionTimeout); - out.writeVInt(numNodesConnected); - out.writeString(clusterAlias); - if (out.getVersion().onOrAfter(Version.V_6_1_0)) { - out.writeBoolean(skipUnavailable); - } - } - @Override public boolean equals(Object o) { if (this == o) return true; @@ -116,7 +126,6 @@ public boolean equals(Object o) { return connectionsPerCluster == that.connectionsPerCluster && numNodesConnected == that.numNodesConnected && Objects.equals(seedNodes, that.seedNodes) && - Objects.equals(httpAddresses, that.httpAddresses) && Objects.equals(initialConnectionTimeout, that.initialConnectionTimeout) && Objects.equals(clusterAlias, that.clusterAlias) && skipUnavailable == that.skipUnavailable; @@ -124,7 +133,7 @@ public boolean equals(Object o) { @Override public int hashCode() { - return Objects.hash(seedNodes, httpAddresses, connectionsPerCluster, initialConnectionTimeout, + return Objects.hash(seedNodes, connectionsPerCluster, initialConnectionTimeout, numNodesConnected, clusterAlias, skipUnavailable); } } diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java index 0d8a469981966..69096677664b3 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java @@ -42,6 +42,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.Strings; import org.elasticsearch.common.SuppressForbidden; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; @@ -717,22 +718,6 @@ public void run() { } } - private static void installNodeStatsHandler(TransportService service, DiscoveryNode...nodes) { - service.registerRequestHandler(NodesInfoAction.NAME, NodesInfoRequest::new, ThreadPool.Names.SAME, false, false, - (request, channel) -> { - List nodeInfos = new ArrayList<>(); - int port = 80; - for (DiscoveryNode node : nodes) { - HttpInfo http = new HttpInfo(new BoundTransportAddress(new TransportAddress[]{node.getAddress()}, - new TransportAddress(node.getAddress().address().getAddress(), port++)), 100); - nodeInfos.add(new NodeInfo(node.getVersion(), Build.CURRENT, node, null, null, null, null, null, null, http, null, - null, null)); - } - channel.sendResponse(new NodesInfoResponse(ClusterName.DEFAULT, nodeInfos, Collections.emptyList())); - }); - - } - public void testGetConnectionInfo() throws Exception { List knownNodes = new CopyOnWriteArrayList<>(); try (MockTransportService transport1 = startTransport("seed_node", knownNodes, Version.CURRENT); @@ -753,34 +738,24 @@ public void testGetConnectionInfo() throws Exception { service.acceptIncomingRequests(); int maxNumConnections = randomIntBetween(1, 5); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes, service, maxNumConnections, n -> true)) { + seedNodes, service, maxNumConnections, n -> true)) { // test no nodes connected - RemoteConnectionInfo remoteConnectionInfo = assertSerialization(getRemoteConnectionInfo(connection)); + RemoteConnectionInfo remoteConnectionInfo = assertSerialization(connection.getConnectionInfo()); assertNotNull(remoteConnectionInfo); assertEquals(0, remoteConnectionInfo.numNodesConnected); - assertEquals(0, remoteConnectionInfo.seedNodes.size()); - assertEquals(0, remoteConnectionInfo.httpAddresses.size()); + assertEquals(3, remoteConnectionInfo.seedNodes.size()); assertEquals(maxNumConnections, remoteConnectionInfo.connectionsPerCluster); assertEquals("test-cluster", remoteConnectionInfo.clusterAlias); - updateSeedNodes(connection, seedNodes); - expectThrows(RemoteTransportException.class, () -> getRemoteConnectionInfo(connection)); - - for (MockTransportService s : Arrays.asList(transport1, transport2, transport3)) { - installNodeStatsHandler(s, node1, node2, node3); - } - remoteConnectionInfo = getRemoteConnectionInfo(connection); - remoteConnectionInfo = assertSerialization(remoteConnectionInfo); + // Connect some nodes + updateSeedNodes(connection, seedNodes); + remoteConnectionInfo = assertSerialization(connection.getConnectionInfo()); assertNotNull(remoteConnectionInfo); assertEquals(connection.getNumNodesConnected(), remoteConnectionInfo.numNodesConnected); assertEquals(Math.min(3, maxNumConnections), connection.getNumNodesConnected()); assertEquals(3, remoteConnectionInfo.seedNodes.size()); - assertEquals(remoteConnectionInfo.httpAddresses.size(), Math.min(3, maxNumConnections)); assertEquals(maxNumConnections, remoteConnectionInfo.connectionsPerCluster); assertEquals("test-cluster", remoteConnectionInfo.clusterAlias); - for (TransportAddress address : remoteConnectionInfo.httpAddresses) { - assertTrue("port range mismatch: " + address.getPort(), address.getPort() >= 80 && address.getPort() <= 90); - } } } } @@ -789,48 +764,41 @@ public void testGetConnectionInfo() throws Exception { public void testRemoteConnectionInfo() throws IOException { RemoteConnectionInfo stats = new RemoteConnectionInfo("test_cluster", Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 1)), - Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 80)), 4, 3, TimeValue.timeValueMinutes(30), false); assertSerialization(stats); RemoteConnectionInfo stats1 = new RemoteConnectionInfo("test_cluster", Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 1)), - Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 80)), 4, 4, TimeValue.timeValueMinutes(30), true); assertSerialization(stats1); assertNotEquals(stats, stats1); stats1 = new RemoteConnectionInfo("test_cluster_1", Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 1)), - Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 80)), 4, 3, TimeValue.timeValueMinutes(30), false); assertSerialization(stats1); assertNotEquals(stats, stats1); stats1 = new RemoteConnectionInfo("test_cluster", Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 15)), - Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 80)), 4, 3, TimeValue.timeValueMinutes(30), false); assertSerialization(stats1); assertNotEquals(stats, stats1); stats1 = new RemoteConnectionInfo("test_cluster", Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 1)), - Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 87)), 4, 3, TimeValue.timeValueMinutes(30), true); assertSerialization(stats1); assertNotEquals(stats, stats1); stats1 = new RemoteConnectionInfo("test_cluster", Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 1)), - Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 80)), 4, 3, TimeValue.timeValueMinutes(325), true); assertSerialization(stats1); assertNotEquals(stats, stats1); stats1 = new RemoteConnectionInfo("test_cluster", Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 1)), - Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 80)), 5, 3, TimeValue.timeValueMinutes(30), false); assertSerialization(stats1); assertNotEquals(stats, stats1); @@ -850,13 +818,14 @@ private static RemoteConnectionInfo assertSerialization(RemoteConnectionInfo inf } public void testRemoteConnectionInfoBwComp() throws IOException { - final Version version = VersionUtils.randomVersionBetween(random(), Version.V_5_6_5, Version.V_6_0_0); + final Version version = VersionUtils.randomVersionBetween(random(), + Version.V_6_1_0, VersionUtils.getPreviousVersion(Version.V_7_0_0_alpha1)); RemoteConnectionInfo expected = new RemoteConnectionInfo("test_cluster", Collections.singletonList(new TransportAddress(TransportAddress.META_ADDRESS, 1)), - Collections.singletonList(new TransportAddress(TransportAddress.META_ADDRESS, 80)), 4, 4, new TimeValue(30, TimeUnit.MINUTES), false); - String encoded = "AQQAAAAABzAuMC4wLjAAAAABAQQAAAAABzAuMC4wLjAAAABQBDwEBAx0ZXN0X2NsdXN0ZXIAAAAAAAAAAAAAAA=="; + // This version was created using the serialization code in use from 6.1 but before 7.0 + String encoded = "AQQAAAAABzAuMC4wLjAAAAABAQQAAAAABzAuMC4wLjAAAABQBDwEBAx0ZXN0X2NsdXN0ZXIA"; final byte[] data = Base64.getDecoder().decode(encoded); try (StreamInput in = StreamInput.wrap(data)) { @@ -879,55 +848,29 @@ public void testRemoteConnectionInfoBwComp() throws IOException { public void testRenderConnectionInfoXContent() throws IOException { RemoteConnectionInfo stats = new RemoteConnectionInfo("test_cluster", Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS,1)), - Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS,80)), 4, 3, TimeValue.timeValueMinutes(30), true); stats = assertSerialization(stats); XContentBuilder builder = XContentFactory.jsonBuilder(); builder.startObject(); stats.toXContent(builder, null); builder.endObject(); - assertEquals("{\"test_cluster\":{\"seeds\":[\"0.0.0.0:1\"],\"http_addresses\":[\"0.0.0.0:80\"],\"connected\":true," + + assertEquals("{\"test_cluster\":{\"seeds\":[\"0.0.0.0:1\"],\"connected\":true," + "\"num_nodes_connected\":3,\"max_connections_per_cluster\":4,\"initial_connect_timeout\":\"30m\"," + "\"skip_unavailable\":true}}", Strings.toString(builder)); stats = new RemoteConnectionInfo("some_other_cluster", Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS,1), new TransportAddress(TransportAddress.META_ADDRESS,2)), - Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS,80), new TransportAddress(TransportAddress.META_ADDRESS,81)), 2, 0, TimeValue.timeValueSeconds(30), false); stats = assertSerialization(stats); builder = XContentFactory.jsonBuilder(); builder.startObject(); stats.toXContent(builder, null); builder.endObject(); - assertEquals("{\"some_other_cluster\":{\"seeds\":[\"0.0.0.0:1\",\"0.0.0.0:2\"],\"http_addresses\":[\"0.0.0.0:80\",\"0.0.0.0:81\"]," + assertEquals("{\"some_other_cluster\":{\"seeds\":[\"0.0.0.0:1\",\"0.0.0.0:2\"]," + "\"connected\":false,\"num_nodes_connected\":0,\"max_connections_per_cluster\":2,\"initial_connect_timeout\":\"30s\"," + "\"skip_unavailable\":false}}", Strings.toString(builder)); } - private RemoteConnectionInfo getRemoteConnectionInfo(RemoteClusterConnection connection) throws Exception { - AtomicReference statsRef = new AtomicReference<>(); - AtomicReference exceptionRef = new AtomicReference<>(); - CountDownLatch latch = new CountDownLatch(1); - connection.getConnectionInfo(new ActionListener() { - @Override - public void onResponse(RemoteConnectionInfo remoteConnectionInfo) { - statsRef.set(remoteConnectionInfo); - latch.countDown(); - } - - @Override - public void onFailure(Exception e) { - exceptionRef.set(e); - latch.countDown(); - } - }); - latch.await(); - if (exceptionRef.get() != null) { - throw exceptionRef.get(); - } - return statsRef.get(); - } - public void testEnsureConnected() throws IOException, InterruptedException { List knownNodes = new CopyOnWriteArrayList<>(); try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT);