Skip to content

Commit

Permalink
CCS: Drop http address from remote cluster info
Browse files Browse the repository at this point in the history
They are expensive to fetch and no longer needed by Kibana so they
*shouldn't* be needed by anyone else either.

Closes elastic#29207
  • Loading branch information
nik9000 committed Apr 17, 2018
1 parent 8f91743 commit 5cb9d68
Show file tree
Hide file tree
Showing 8 changed files with 73 additions and 185 deletions.
3 changes: 0 additions & 3 deletions docs/reference/cluster/remote-info.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ the configured remote cluster alias.
`seeds`::
The configured initial seed transport addresses of the remote cluster.

`http_addresses`::
The published http addresses of all connected remote nodes.

`connected`::
True if there is at least one connection to the remote cluster.

Expand Down
6 changes: 5 additions & 1 deletion docs/reference/release-notes/7.0.0-alpha1.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,8 @@ The changes listed below have been released for the first time in Elasticsearch
=== Breaking changes

Core::
* Tribe node has been removed in favor of Cross-Cluster-Search
* Tribe node has been removed in favor of Cross-Cluster-Search

Cross-Cluster-Search::
* `http_addresses` has been removed from the <<cluster-remote-info>> API
because it is expensive to fetch and no longer needed by Kibana.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import static java.util.stream.Collectors.toList;

public final class TransportRemoteInfoAction extends HandledTransportAction<RemoteInfoRequest, RemoteInfoResponse> {

private final RemoteClusterService remoteClusterService;
Expand All @@ -45,7 +47,6 @@ public TransportRemoteInfoAction(Settings settings, ThreadPool threadPool, Trans

@Override
protected void doExecute(RemoteInfoRequest remoteInfoRequest, ActionListener<RemoteInfoResponse> listener) {
remoteClusterService.getRemoteConnectionInfos(ActionListener.wrap(remoteConnectionInfos
-> listener.onResponse(new RemoteInfoResponse(remoteConnectionInfos)), listener::onFailure));
listener.onResponse(new RemoteInfoResponse(remoteClusterService.getRemoteConnectionInfos().collect(toList())));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.RestBuilderListener;
import org.elasticsearch.rest.action.RestToXContentListener;

import java.io.IOException;

Expand All @@ -50,16 +51,8 @@ public String getName() {
}

@Override
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client)
throws IOException {
return channel -> client.execute(RemoteInfoAction.INSTANCE, new RemoteInfoRequest(),
new RestBuilderListener<RemoteInfoResponse>(channel) {
@Override
public RestResponse buildResponse(RemoteInfoResponse response, XContentBuilder builder) throws Exception {
response.toXContent(builder, request);
return new BytesRestResponse(RestStatus.OK, builder);
}
});
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
return channel -> client.execute(RemoteInfoAction.INSTANCE, new RemoteInfoRequest(), new RestToXContentListener<>(channel));
}
@Override
public boolean canTripCircuitBreaker() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ThreadContext;
Expand Down Expand Up @@ -602,66 +603,13 @@ void addConnectedNode(DiscoveryNode node) {
}

/**
* Fetches connection info for this connection
* Get the information about remote nodes to be rendered on {@code _remote/info} requests.
*/
public void getConnectionInfo(ActionListener<RemoteConnectionInfo> listener) {
final Optional<DiscoveryNode> anyNode = connectedNodes.getAny();
if (anyNode.isPresent() == false) {
// not connected we return immediately
RemoteConnectionInfo remoteConnectionStats = new RemoteConnectionInfo(clusterAlias,
Collections.emptyList(), Collections.emptyList(), maxNumRemoteConnections, 0,
RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings), skipUnavailable);
listener.onResponse(remoteConnectionStats);
} else {
NodesInfoRequest request = new NodesInfoRequest();
request.clear();
request.http(true);

transportService.sendRequest(anyNode.get(), NodesInfoAction.NAME, request, new TransportResponseHandler<NodesInfoResponse>() {
@Override
public NodesInfoResponse newInstance() {
return new NodesInfoResponse();
}

@Override
public void handleResponse(NodesInfoResponse response) {
Collection<TransportAddress> httpAddresses = new HashSet<>();
for (NodeInfo info : response.getNodes()) {
if (connectedNodes.contains(info.getNode()) && info.getHttp() != null) {
httpAddresses.add(info.getHttp().getAddress().publishAddress());
}
}

if (httpAddresses.size() < maxNumRemoteConnections) {
// just in case non of the connected nodes have http enabled we get other http enabled nodes instead.
for (NodeInfo info : response.getNodes()) {
if (nodePredicate.test(info.getNode()) && info.getHttp() != null) {
httpAddresses.add(info.getHttp().getAddress().publishAddress());
}
if (httpAddresses.size() == maxNumRemoteConnections) {
break; // once we have enough return...
}
}
}
RemoteConnectionInfo remoteConnectionInfo = new RemoteConnectionInfo(clusterAlias,
seedNodes.stream().map(DiscoveryNode::getAddress).collect(Collectors.toList()), new ArrayList<>(httpAddresses),
maxNumRemoteConnections, connectedNodes.size(),
RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings), skipUnavailable);
listener.onResponse(remoteConnectionInfo);
}

@Override
public void handleException(TransportException exp) {
listener.onFailure(exp);
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}
});
}

public RemoteConnectionInfo getConnectionInfo() {
List<TransportAddress> seedNodeAddresses = seedNodes.stream().map(DiscoveryNode::getAddress).collect(Collectors.toList());
TimeValue initialConnectionTimeout = RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings);
return new RemoteConnectionInfo(clusterAlias, seedNodeAddresses, maxNumRemoteConnections, connectedNodes.size(),
initialConnectionTimeout, skipUnavailable);
}

int getNumNodesConnected() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -56,6 +57,7 @@
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.elasticsearch.common.settings.Setting.boolSetting;

Expand Down Expand Up @@ -348,17 +350,8 @@ public void close() throws IOException {
IOUtils.close(remoteClusters.values());
}

public void getRemoteConnectionInfos(ActionListener<Collection<RemoteConnectionInfo>> listener) {
final Map<String, RemoteClusterConnection> remoteClusters = this.remoteClusters;
if (remoteClusters.isEmpty()) {
listener.onResponse(Collections.emptyList());
} else {
final GroupedActionListener<RemoteConnectionInfo> actionListener = new GroupedActionListener<>(listener,
remoteClusters.size(), Collections.emptyList());
for (RemoteClusterConnection connection : remoteClusters.values()) {
connection.getConnectionInfo(actionListener);
}
}
public Stream<RemoteConnectionInfo> getRemoteConnectionInfos() {
return remoteClusters.values().stream().map(RemoteClusterConnection::getConnectionInfo);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,30 +27,29 @@
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder;

import static java.util.Collections.emptyList;

import java.io.IOException;
import java.util.List;
import java.util.Objects;

/**
* This class encapsulates all remote cluster information to be rendered on
* <tt>_remote/info</tt> requests.
* {@code _remote/info} requests.
*/
public final class RemoteConnectionInfo implements ToXContentFragment, Writeable {
final List<TransportAddress> seedNodes;
final List<TransportAddress> httpAddresses;
final int connectionsPerCluster;
final TimeValue initialConnectionTimeout;
final int numNodesConnected;
final String clusterAlias;
final boolean skipUnavailable;

RemoteConnectionInfo(String clusterAlias, List<TransportAddress> seedNodes,
List<TransportAddress> httpAddresses,
int connectionsPerCluster, int numNodesConnected,
TimeValue initialConnectionTimeout, boolean skipUnavailable) {
this.clusterAlias = clusterAlias;
this.seedNodes = seedNodes;
this.httpAddresses = httpAddresses;
this.connectionsPerCluster = connectionsPerCluster;
this.numNodesConnected = numNodesConnected;
this.initialConnectionTimeout = initialConnectionTimeout;
Expand All @@ -59,16 +58,45 @@ public final class RemoteConnectionInfo implements ToXContentFragment, Writeable

public RemoteConnectionInfo(StreamInput input) throws IOException {
seedNodes = input.readList(TransportAddress::new);
httpAddresses = input.readList(TransportAddress::new);
if (input.getVersion().before(Version.V_7_0_0_alpha1)) {
/*
* Versions before 7.0 sent the HTTP addresses of all nodes in the
* remote cluster here but it was expensive to fetch and we
* ultimately figured out how to do without it. So we removed it.
*
* We just throw any HTTP addresses received here on the floor
* because we don't need to do anything with them.
*/
input.readList(TransportAddress::new);
}
connectionsPerCluster = input.readVInt();
initialConnectionTimeout = input.readTimeValue();
numNodesConnected = input.readVInt();
clusterAlias = input.readString();
if (input.getVersion().onOrAfter(Version.V_6_1_0)) {
skipUnavailable = input.readBoolean();
} else {
skipUnavailable = false;
skipUnavailable = input.readBoolean();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeList(seedNodes);
if (out.getVersion().before(Version.V_7_0_0_alpha1)) {
/*
* Versions before 7.0 sent the HTTP addresses of all nodes in the
* remote cluster here but it was expensive to fetch and we
* ultimately figured out how to do without it. So we removed it.
*
* When sending this request to a node that expects HTTP addresses
* here we pretend that we didn't find any. This *should* be fine
* because, after all, we haven't been using this information for
* a while.
*/
out.writeList(emptyList());
}
out.writeVInt(connectionsPerCluster);
out.writeTimeValue(initialConnectionTimeout);
out.writeVInt(numNodesConnected);
out.writeString(clusterAlias);
out.writeBoolean(skipUnavailable);
}

@Override
Expand All @@ -80,11 +108,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.value(addr.toString());
}
builder.endArray();
builder.startArray("http_addresses");
for (TransportAddress addr : httpAddresses) {
builder.value(addr.toString());
}
builder.endArray();
builder.field("connected", numNodesConnected > 0);
builder.field("num_nodes_connected", numNodesConnected);
builder.field("max_connections_per_cluster", connectionsPerCluster);
Expand All @@ -95,19 +118,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
return builder;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeList(seedNodes);
out.writeList(httpAddresses);
out.writeVInt(connectionsPerCluster);
out.writeTimeValue(initialConnectionTimeout);
out.writeVInt(numNodesConnected);
out.writeString(clusterAlias);
if (out.getVersion().onOrAfter(Version.V_6_1_0)) {
out.writeBoolean(skipUnavailable);
}
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand All @@ -116,15 +126,14 @@ public boolean equals(Object o) {
return connectionsPerCluster == that.connectionsPerCluster &&
numNodesConnected == that.numNodesConnected &&
Objects.equals(seedNodes, that.seedNodes) &&
Objects.equals(httpAddresses, that.httpAddresses) &&
Objects.equals(initialConnectionTimeout, that.initialConnectionTimeout) &&
Objects.equals(clusterAlias, that.clusterAlias) &&
skipUnavailable == that.skipUnavailable;
}

@Override
public int hashCode() {
return Objects.hash(seedNodes, httpAddresses, connectionsPerCluster, initialConnectionTimeout,
return Objects.hash(seedNodes, connectionsPerCluster, initialConnectionTimeout,
numNodesConnected, clusterAlias, skipUnavailable);
}
}
Loading

0 comments on commit 5cb9d68

Please sign in to comment.