Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transport client: Don't validate node in handshake (#30737) #31080

Merged
merged 1 commit into from
Jun 4, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
Expand Down Expand Up @@ -56,6 +56,7 @@
import org.elasticsearch.transport.TransportService;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
Expand Down Expand Up @@ -89,6 +90,7 @@ final class TransportClientNodesService extends AbstractComponent implements Clo
private final Object mutex = new Object();

private volatile List<DiscoveryNode> nodes = Collections.emptyList();
// Filtered nodes are nodes whose cluster name does not match the configured cluster name
private volatile List<DiscoveryNode> filteredNodes = Collections.emptyList();

private final AtomicInteger tempNodeIdGenerator = new AtomicInteger();
Expand Down Expand Up @@ -268,7 +270,7 @@ public static class RetryListener<Response> implements ActionListener<Response>
private volatile int i;

RetryListener(NodeListenerCallback<Response> callback, ActionListener<Response> listener,
List<DiscoveryNode> nodes, int index, TransportClient.HostFailureListener hostFailureListener) {
List<DiscoveryNode> nodes, int index, TransportClient.HostFailureListener hostFailureListener) {
this.callback = callback;
this.listener = listener;
this.nodes = nodes;
Expand Down Expand Up @@ -361,10 +363,10 @@ public void sample() {
protected abstract void doSample();

/**
* validates a set of potentially newly discovered nodes and returns an immutable
* list of the nodes that has passed.
* Establishes the node connections. If validateInHandshake is set to true, the connection will fail if
* node returned in the handshake response is different than the discovery node.
*/
protected List<DiscoveryNode> validateNewNodes(Set<DiscoveryNode> nodes) {
List<DiscoveryNode> establishNodeConnections(Set<DiscoveryNode> nodes) {
for (Iterator<DiscoveryNode> it = nodes.iterator(); it.hasNext(); ) {
DiscoveryNode node = it.next();
if (!transportService.nodeConnected(node)) {
Expand All @@ -380,7 +382,6 @@ protected List<DiscoveryNode> validateNewNodes(Set<DiscoveryNode> nodes) {

return Collections.unmodifiableList(new ArrayList<>(nodes));
}

}

class ScheduledNodeSampler implements Runnable {
Expand All @@ -402,14 +403,16 @@ class SimpleNodeSampler extends NodeSampler {
@Override
protected void doSample() {
HashSet<DiscoveryNode> newNodes = new HashSet<>();
HashSet<DiscoveryNode> newFilteredNodes = new HashSet<>();
ArrayList<DiscoveryNode> newFilteredNodes = new ArrayList<>();
for (DiscoveryNode listedNode : listedNodes) {
try (Transport.Connection connection = transportService.openConnection(listedNode, LISTED_NODES_PROFILE)){
final PlainTransportFuture<LivenessResponse> handler = new PlainTransportFuture<>(
new FutureTransportResponseHandler<LivenessResponse>() {
@Override
public LivenessResponse newInstance() {
return new LivenessResponse();
public LivenessResponse read(StreamInput in) throws IOException {
LivenessResponse response = new LivenessResponse();
response.readFrom(in);
return response;
}
});
transportService.sendRequest(connection, TransportLivenessAction.NAME, new LivenessRequest(),
Expand All @@ -435,8 +438,8 @@ public LivenessResponse newInstance() {
}
}

nodes = validateNewNodes(newNodes);
filteredNodes = Collections.unmodifiableList(new ArrayList<>(newFilteredNodes));
nodes = establishNodeConnections(newNodes);
filteredNodes = Collections.unmodifiableList(newFilteredNodes);
}
}

Expand Down Expand Up @@ -557,7 +560,7 @@ public void handleException(TransportException e) {
}
}

nodes = validateNewNodes(newNodes);
nodes = establishNodeConnections(newNodes);
filteredNodes = Collections.unmodifiableList(new ArrayList<>(newFilteredNodes));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.node.liveness.TransportLivenessAction;
Expand Down Expand Up @@ -124,6 +126,8 @@ protected boolean removeEldestEntry(Map.Entry eldest) {

private final RemoteClusterService remoteClusterService;

private final boolean validateConnections;

/** if set will call requests sent to this id to shortcut and executed locally */
volatile DiscoveryNode localNode = null;
private final Transport.Connection localNodeConnection = new Transport.Connection() {
Expand Down Expand Up @@ -153,6 +157,9 @@ public TransportService(Settings settings, Transport transport, ThreadPool threa
Function<BoundTransportAddress, DiscoveryNode> localNodeFactory, @Nullable ClusterSettings clusterSettings,
Set<String> taskHeaders) {
super(settings);
// The only time we do not want to validate node connections is when this is a transport client using the simple node sampler
this.validateConnections = TransportClient.CLIENT_TYPE.equals(settings.get(Client.CLIENT_TYPE_SETTING_S.getKey())) == false ||
TransportClient.CLIENT_TRANSPORT_SNIFF.get(settings);
this.transport = transport;
this.threadPool = threadPool;
this.localNodeFactory = localNodeFactory;
Expand Down Expand Up @@ -314,6 +321,11 @@ public boolean nodeConnected(DiscoveryNode node) {
return isLocalNode(node) || transport.nodeConnected(node);
}

/**
* Connect to the specified node with the default connection profile
*
* @param node the node to connect to
*/
public void connectToNode(DiscoveryNode node) throws ConnectTransportException {
connectToNode(node, null);
}
Expand All @@ -331,7 +343,7 @@ public void connectToNode(final DiscoveryNode node, ConnectionProfile connection
transport.connectToNode(node, connectionProfile, (newConnection, actualProfile) -> {
// We don't validate cluster names to allow for tribe node connections.
final DiscoveryNode remote = handshake(newConnection, actualProfile.getHandshakeTimeout().millis(), cn -> true);
if (node.equals(remote) == false) {
if (validateConnections && node.equals(remote) == false) {
throw new ConnectTransportException(node, "handshake failed. unexpected remote node " + remote);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
package org.elasticsearch.transport;

import org.elasticsearch.Version;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkService;
Expand Down Expand Up @@ -178,6 +180,42 @@ public void testNodeConnectWithDifferentNodeId() {
assertFalse(handleA.transportService.nodeConnected(discoveryNode));
}

public void testNodeConnectWithDifferentNodeIdSucceedsIfThisIsTransportClientOfSimpleNodeSampler() {
Settings.Builder settings = Settings.builder().put("cluster.name", "test");
Settings transportClientSettings = settings.put(Client.CLIENT_TYPE_SETTING_S.getKey(), TransportClient.CLIENT_TYPE).build();
NetworkHandle handleA = startServices("TS_A", transportClientSettings, Version.CURRENT);
NetworkHandle handleB = startServices("TS_B", settings.build(), Version.CURRENT);
DiscoveryNode discoveryNode = new DiscoveryNode(
randomAlphaOfLength(10),
handleB.discoveryNode.getAddress(),
emptyMap(),
emptySet(),
handleB.discoveryNode.getVersion());

handleA.transportService.connectToNode(discoveryNode, MockTcpTransport.LIGHT_PROFILE);
assertTrue(handleA.transportService.nodeConnected(discoveryNode));
}

public void testNodeConnectWithDifferentNodeIdFailsWhenSnifferTransportClient() {
Settings.Builder settings = Settings.builder().put("cluster.name", "test");
Settings transportClientSettings = settings.put(Client.CLIENT_TYPE_SETTING_S.getKey(), TransportClient.CLIENT_TYPE)
.put(TransportClient.CLIENT_TRANSPORT_SNIFF.getKey(), true)
.build();
NetworkHandle handleA = startServices("TS_A", transportClientSettings, Version.CURRENT);
NetworkHandle handleB = startServices("TS_B", settings.build(), Version.CURRENT);
DiscoveryNode discoveryNode = new DiscoveryNode(
randomAlphaOfLength(10),
handleB.discoveryNode.getAddress(),
emptyMap(),
emptySet(),
handleB.discoveryNode.getVersion());
ConnectTransportException ex = expectThrows(ConnectTransportException.class, () -> {
handleA.transportService.connectToNode(discoveryNode, MockTcpTransport.LIGHT_PROFILE);
});
assertThat(ex.getMessage(), containsString("unexpected remote node"));
assertFalse(handleA.transportService.nodeConnected(discoveryNode));
}


private static class NetworkHandle {
private TransportService transportService;
Expand Down