Skip to content

Commit

Permalink
[Zen2] Add HandshakingTransportAddressConnector (#32643)
Browse files Browse the repository at this point in the history
The `PeerFinder`, introduced in #32246, needs to be able to identify, and
connect to, a remote master node using only its `TransportAddress`. This can be
done by opening a single-channel connection to the address, performing a
handshake, and only then forming a full-blown connection to the node. This
change implements this logic.
  • Loading branch information
DaveCTurner authored Aug 7, 2018
1 parent 2176184 commit 289e34a
Show file tree
Hide file tree
Showing 3 changed files with 300 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.discovery;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.discovery.PeerFinder.TransportAddressConnector;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.ConnectionProfile;
import org.elasticsearch.transport.Transport.Connection;
import org.elasticsearch.transport.TransportRequestOptions.Type;
import org.elasticsearch.transport.TransportService;

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;

public class HandshakingTransportAddressConnector extends AbstractComponent implements TransportAddressConnector {

// connection timeout for probes
public static final Setting<TimeValue> PROBE_CONNECT_TIMEOUT_SETTING =
Setting.timeSetting("discovery.probe.connect_timeout",
TimeValue.timeValueMillis(3000), TimeValue.timeValueMillis(1), Setting.Property.NodeScope);
// handshake timeout for probes
public static final Setting<TimeValue> PROBE_HANDSHAKE_TIMEOUT_SETTING =
Setting.timeSetting("discovery.probe.handshake_timeout",
TimeValue.timeValueMillis(1000), TimeValue.timeValueMillis(1), Setting.Property.NodeScope);

private final TransportService transportService;
private final TimeValue probeConnectTimeout;
private final TimeValue probeHandshakeTimeout;

public HandshakingTransportAddressConnector(Settings settings, TransportService transportService) {
super(settings);
this.transportService = transportService;
probeConnectTimeout = PROBE_CONNECT_TIMEOUT_SETTING.get(settings);
probeHandshakeTimeout = PROBE_HANDSHAKE_TIMEOUT_SETTING.get(settings);
}

@Override
public void connectToRemoteMasterNode(TransportAddress transportAddress, ActionListener<DiscoveryNode> listener) {
transportService.getThreadPool().generic().execute(new AbstractRunnable() {
@Override
protected void doRun() throws Exception {

// TODO if transportService is already connected to this address then skip the handshaking

final DiscoveryNode targetNode = new DiscoveryNode(transportAddress.toString(), transportAddress, emptyMap(),
emptySet(), Version.CURRENT.minimumCompatibilityVersion());

logger.trace("[{}] opening probe connection", this);
final Connection connection = transportService.openConnection(targetNode,
ConnectionProfile.buildSingleChannelProfile(Type.REG, probeConnectTimeout, probeHandshakeTimeout));
logger.trace("[{}] opened probe connection", this);

final DiscoveryNode remoteNode;
try {
remoteNode = transportService.handshake(connection, probeHandshakeTimeout.millis());
// success means (amongst other things) that the cluster names match
logger.trace("[{}] handshake successful: {}", this, remoteNode);
} finally {
IOUtils.closeWhileHandlingException(connection);
}

if (remoteNode.equals(transportService.getLocalNode())) {
// TODO cache this result for some time? forever?
listener.onFailure(new ConnectTransportException(remoteNode, "local node found"));
} else if (remoteNode.isMasterNode() == false) {
// TODO cache this result for some time?
listener.onFailure(new ConnectTransportException(remoteNode, "non-master-eligible node found"));
} else {
transportService.connectToNode(remoteNode);
logger.trace("[{}] full connection successful: {}", this, remoteNode);
listener.onResponse(remoteNode);
}
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}

@Override
public String toString() {
return "connectToRemoteMasterNode[" + transportAddress + "]";
}
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.discovery;

import org.apache.lucene.util.SetOnce;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.CapturingTransport;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.TransportService.HandshakeResponse;
import org.junit.After;
import org.junit.Before;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING;
import static org.elasticsearch.discovery.HandshakingTransportAddressConnector.PROBE_HANDSHAKE_TIMEOUT_SETTING;
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
import static org.hamcrest.Matchers.equalTo;

public class HandshakingTransportAddressConnectorTests extends ESTestCase {

private DiscoveryNode remoteNode;
private TransportService transportService;
private ThreadPool threadPool;
private String remoteClusterName;
private HandshakingTransportAddressConnector handshakingTransportAddressConnector;
private DiscoveryNode localNode;

private boolean dropHandshake;

@Before
public void startServices() {
localNode = new DiscoveryNode("local-node", buildNewFakeTransportAddress(), Version.CURRENT);
final Settings settings = Settings.builder()
.put(NODE_NAME_SETTING.getKey(), "node")
.put(CLUSTER_NAME_SETTING.getKey(), "local-cluster")
.build();
threadPool = new TestThreadPool("node", settings);

remoteNode = null;
remoteClusterName = null;
dropHandshake = false;

final CapturingTransport capturingTransport = new CapturingTransport() {
@Override
protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) {
super.onSendRequest(requestId, action, request, node);
assertThat(action, equalTo(TransportService.HANDSHAKE_ACTION_NAME));
assertEquals(remoteNode.getAddress(), node.getAddress());
assertNotEquals(remoteNode, node);
if (dropHandshake == false) {
handleResponse(requestId, new HandshakeResponse(remoteNode, new ClusterName(remoteClusterName), Version.CURRENT));
}
}
};

transportService = new TransportService(settings, capturingTransport, threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR, address -> localNode, null, emptySet());

transportService.start();
transportService.acceptIncomingRequests();

handshakingTransportAddressConnector = new HandshakingTransportAddressConnector(settings, transportService);
}

@After
public void stopServices() throws InterruptedException {
transportService.stop();
terminate(threadPool);
}

public void testConnectsToMasterNode() throws InterruptedException {
final CountDownLatch completionLatch = new CountDownLatch(1);
final SetOnce<DiscoveryNode> receivedNode = new SetOnce<>();

remoteNode = new DiscoveryNode("remote-node", buildNewFakeTransportAddress(), Version.CURRENT);
remoteClusterName = "local-cluster";

handshakingTransportAddressConnector.connectToRemoteMasterNode(remoteNode.getAddress(), new ActionListener<DiscoveryNode>() {
@Override
public void onResponse(DiscoveryNode discoveryNode) {
receivedNode.set(discoveryNode);
completionLatch.countDown();
}

@Override
public void onFailure(Exception e) {
throw new AssertionError(e);
}
});

assertTrue(completionLatch.await(30, TimeUnit.SECONDS));
assertEquals(remoteNode, receivedNode.get());
}

public void testDoesNotConnectToNonMasterNode() throws InterruptedException {
remoteNode = new DiscoveryNode("remote-node", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
remoteClusterName = "local-cluster";

FailureListener failureListener = new FailureListener();
handshakingTransportAddressConnector.connectToRemoteMasterNode(remoteNode.getAddress(), failureListener);
failureListener.assertFailure();
}

public void testDoesNotConnectToLocalNode() throws Exception {
remoteNode = localNode;
remoteClusterName = "local-cluster";

FailureListener failureListener = new FailureListener();
handshakingTransportAddressConnector.connectToRemoteMasterNode(remoteNode.getAddress(), failureListener);
failureListener.assertFailure();
}

public void testDoesNotConnectToDifferentCluster() throws InterruptedException {
remoteNode = new DiscoveryNode("remote-node", buildNewFakeTransportAddress(), Version.CURRENT);
remoteClusterName = "another-cluster";

FailureListener failureListener = new FailureListener();
handshakingTransportAddressConnector.connectToRemoteMasterNode(remoteNode.getAddress(), failureListener);
failureListener.assertFailure();
}

public void testHandshakeTimesOut() throws InterruptedException {
remoteNode = new DiscoveryNode("remote-node", buildNewFakeTransportAddress(), Version.CURRENT);
remoteClusterName = "local-cluster";
dropHandshake = true;

FailureListener failureListener = new FailureListener();
handshakingTransportAddressConnector.connectToRemoteMasterNode(remoteNode.getAddress(), failureListener);
Thread.sleep(PROBE_HANDSHAKE_TIMEOUT_SETTING.get(Settings.EMPTY).millis());
failureListener.assertFailure();
}

private class FailureListener implements ActionListener<DiscoveryNode> {
final CountDownLatch completionLatch = new CountDownLatch(1);

@Override
public void onResponse(DiscoveryNode discoveryNode) {
fail(discoveryNode.toString());
}

@Override
public void onFailure(Exception e) {
completionLatch.countDown();
}

void assertFailure() throws InterruptedException {
assertTrue(completionLatch.await(30, TimeUnit.SECONDS));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -204,18 +204,21 @@ public DiscoveryNode getNode() {

@Override
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
throws IOException, TransportException {
requests.put(requestId, Tuple.tuple(node, action));
capturedRequests.add(new CapturedRequest(node, requestId, action, request));
throws TransportException {
onSendRequest(requestId, action, request, node);
}

@Override
public void close() throws IOException {

public void close() {
}
};
}

protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) {
requests.put(requestId, Tuple.tuple(node, action));
capturedRequests.add(new CapturedRequest(node, requestId, action, request));
}

@Override
public TransportStats getStats() {
throw new UnsupportedOperationException();
Expand Down

0 comments on commit 289e34a

Please sign in to comment.