From 289e34aeedc185e83ea03b39b9c59666d849b0bb Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 7 Aug 2018 13:34:07 +0100 Subject: [PATCH] [Zen2] Add HandshakingTransportAddressConnector (#32643) The `PeerFinder`, introduced in #32246, needs to be able to identify, and connect to, a remote master node using only its `TransportAddress`. This can be done by opening a single-channel connection to the address, performing a handshake, and only then forming a full-blown connection to the node. This change implements this logic. --- .../HandshakingTransportAddressConnector.java | 113 +++++++++++ ...shakingTransportAddressConnectorTests.java | 179 ++++++++++++++++++ .../test/transport/CapturingTransport.java | 13 +- 3 files changed, 300 insertions(+), 5 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/discovery/HandshakingTransportAddressConnector.java create mode 100644 server/src/test/java/org/elasticsearch/discovery/HandshakingTransportAddressConnectorTests.java diff --git a/server/src/main/java/org/elasticsearch/discovery/HandshakingTransportAddressConnector.java b/server/src/main/java/org/elasticsearch/discovery/HandshakingTransportAddressConnector.java new file mode 100644 index 0000000000000..788689b898a75 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/discovery/HandshakingTransportAddressConnector.java @@ -0,0 +1,113 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.discovery; + +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.core.internal.io.IOUtils; +import org.elasticsearch.discovery.PeerFinder.TransportAddressConnector; +import org.elasticsearch.transport.ConnectTransportException; +import org.elasticsearch.transport.ConnectionProfile; +import org.elasticsearch.transport.Transport.Connection; +import org.elasticsearch.transport.TransportRequestOptions.Type; +import org.elasticsearch.transport.TransportService; + +import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; + +public class HandshakingTransportAddressConnector extends AbstractComponent implements TransportAddressConnector { + + // connection timeout for probes + public static final Setting PROBE_CONNECT_TIMEOUT_SETTING = + Setting.timeSetting("discovery.probe.connect_timeout", + TimeValue.timeValueMillis(3000), TimeValue.timeValueMillis(1), Setting.Property.NodeScope); + // handshake timeout for probes + public static final Setting PROBE_HANDSHAKE_TIMEOUT_SETTING = + Setting.timeSetting("discovery.probe.handshake_timeout", + TimeValue.timeValueMillis(1000), TimeValue.timeValueMillis(1), Setting.Property.NodeScope); + + private final TransportService transportService; + private final TimeValue probeConnectTimeout; + private final TimeValue probeHandshakeTimeout; + + public HandshakingTransportAddressConnector(Settings settings, TransportService transportService) { + super(settings); + this.transportService = transportService; + probeConnectTimeout = PROBE_CONNECT_TIMEOUT_SETTING.get(settings); + probeHandshakeTimeout = PROBE_HANDSHAKE_TIMEOUT_SETTING.get(settings); + } + + @Override + public void connectToRemoteMasterNode(TransportAddress transportAddress, ActionListener listener) { + transportService.getThreadPool().generic().execute(new AbstractRunnable() { + @Override + protected void doRun() throws Exception { + + // TODO if transportService is already connected to this address then skip the handshaking + + final DiscoveryNode targetNode = new DiscoveryNode(transportAddress.toString(), transportAddress, emptyMap(), + emptySet(), Version.CURRENT.minimumCompatibilityVersion()); + + logger.trace("[{}] opening probe connection", this); + final Connection connection = transportService.openConnection(targetNode, + ConnectionProfile.buildSingleChannelProfile(Type.REG, probeConnectTimeout, probeHandshakeTimeout)); + logger.trace("[{}] opened probe connection", this); + + final DiscoveryNode remoteNode; + try { + remoteNode = transportService.handshake(connection, probeHandshakeTimeout.millis()); + // success means (amongst other things) that the cluster names match + logger.trace("[{}] handshake successful: {}", this, remoteNode); + } finally { + IOUtils.closeWhileHandlingException(connection); + } + + if (remoteNode.equals(transportService.getLocalNode())) { + // TODO cache this result for some time? forever? + listener.onFailure(new ConnectTransportException(remoteNode, "local node found")); + } else if (remoteNode.isMasterNode() == false) { + // TODO cache this result for some time? + listener.onFailure(new ConnectTransportException(remoteNode, "non-master-eligible node found")); + } else { + transportService.connectToNode(remoteNode); + logger.trace("[{}] full connection successful: {}", this, remoteNode); + listener.onResponse(remoteNode); + } + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + + @Override + public String toString() { + return "connectToRemoteMasterNode[" + transportAddress + "]"; + } + }); + } +} diff --git a/server/src/test/java/org/elasticsearch/discovery/HandshakingTransportAddressConnectorTests.java b/server/src/test/java/org/elasticsearch/discovery/HandshakingTransportAddressConnectorTests.java new file mode 100644 index 0000000000000..5e71a6c31e50a --- /dev/null +++ b/server/src/test/java/org/elasticsearch/discovery/HandshakingTransportAddressConnectorTests.java @@ -0,0 +1,179 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.discovery; + +import org.apache.lucene.util.SetOnce; +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.transport.CapturingTransport; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportRequest; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.transport.TransportService.HandshakeResponse; +import org.junit.After; +import org.junit.Before; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; +import static org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING; +import static org.elasticsearch.discovery.HandshakingTransportAddressConnector.PROBE_HANDSHAKE_TIMEOUT_SETTING; +import static org.elasticsearch.node.Node.NODE_NAME_SETTING; +import static org.hamcrest.Matchers.equalTo; + +public class HandshakingTransportAddressConnectorTests extends ESTestCase { + + private DiscoveryNode remoteNode; + private TransportService transportService; + private ThreadPool threadPool; + private String remoteClusterName; + private HandshakingTransportAddressConnector handshakingTransportAddressConnector; + private DiscoveryNode localNode; + + private boolean dropHandshake; + + @Before + public void startServices() { + localNode = new DiscoveryNode("local-node", buildNewFakeTransportAddress(), Version.CURRENT); + final Settings settings = Settings.builder() + .put(NODE_NAME_SETTING.getKey(), "node") + .put(CLUSTER_NAME_SETTING.getKey(), "local-cluster") + .build(); + threadPool = new TestThreadPool("node", settings); + + remoteNode = null; + remoteClusterName = null; + dropHandshake = false; + + final CapturingTransport capturingTransport = new CapturingTransport() { + @Override + protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) { + super.onSendRequest(requestId, action, request, node); + assertThat(action, equalTo(TransportService.HANDSHAKE_ACTION_NAME)); + assertEquals(remoteNode.getAddress(), node.getAddress()); + assertNotEquals(remoteNode, node); + if (dropHandshake == false) { + handleResponse(requestId, new HandshakeResponse(remoteNode, new ClusterName(remoteClusterName), Version.CURRENT)); + } + } + }; + + transportService = new TransportService(settings, capturingTransport, threadPool, + TransportService.NOOP_TRANSPORT_INTERCEPTOR, address -> localNode, null, emptySet()); + + transportService.start(); + transportService.acceptIncomingRequests(); + + handshakingTransportAddressConnector = new HandshakingTransportAddressConnector(settings, transportService); + } + + @After + public void stopServices() throws InterruptedException { + transportService.stop(); + terminate(threadPool); + } + + public void testConnectsToMasterNode() throws InterruptedException { + final CountDownLatch completionLatch = new CountDownLatch(1); + final SetOnce receivedNode = new SetOnce<>(); + + remoteNode = new DiscoveryNode("remote-node", buildNewFakeTransportAddress(), Version.CURRENT); + remoteClusterName = "local-cluster"; + + handshakingTransportAddressConnector.connectToRemoteMasterNode(remoteNode.getAddress(), new ActionListener() { + @Override + public void onResponse(DiscoveryNode discoveryNode) { + receivedNode.set(discoveryNode); + completionLatch.countDown(); + } + + @Override + public void onFailure(Exception e) { + throw new AssertionError(e); + } + }); + + assertTrue(completionLatch.await(30, TimeUnit.SECONDS)); + assertEquals(remoteNode, receivedNode.get()); + } + + public void testDoesNotConnectToNonMasterNode() throws InterruptedException { + remoteNode = new DiscoveryNode("remote-node", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); + remoteClusterName = "local-cluster"; + + FailureListener failureListener = new FailureListener(); + handshakingTransportAddressConnector.connectToRemoteMasterNode(remoteNode.getAddress(), failureListener); + failureListener.assertFailure(); + } + + public void testDoesNotConnectToLocalNode() throws Exception { + remoteNode = localNode; + remoteClusterName = "local-cluster"; + + FailureListener failureListener = new FailureListener(); + handshakingTransportAddressConnector.connectToRemoteMasterNode(remoteNode.getAddress(), failureListener); + failureListener.assertFailure(); + } + + public void testDoesNotConnectToDifferentCluster() throws InterruptedException { + remoteNode = new DiscoveryNode("remote-node", buildNewFakeTransportAddress(), Version.CURRENT); + remoteClusterName = "another-cluster"; + + FailureListener failureListener = new FailureListener(); + handshakingTransportAddressConnector.connectToRemoteMasterNode(remoteNode.getAddress(), failureListener); + failureListener.assertFailure(); + } + + public void testHandshakeTimesOut() throws InterruptedException { + remoteNode = new DiscoveryNode("remote-node", buildNewFakeTransportAddress(), Version.CURRENT); + remoteClusterName = "local-cluster"; + dropHandshake = true; + + FailureListener failureListener = new FailureListener(); + handshakingTransportAddressConnector.connectToRemoteMasterNode(remoteNode.getAddress(), failureListener); + Thread.sleep(PROBE_HANDSHAKE_TIMEOUT_SETTING.get(Settings.EMPTY).millis()); + failureListener.assertFailure(); + } + + private class FailureListener implements ActionListener { + final CountDownLatch completionLatch = new CountDownLatch(1); + + @Override + public void onResponse(DiscoveryNode discoveryNode) { + fail(discoveryNode.toString()); + } + + @Override + public void onFailure(Exception e) { + completionLatch.countDown(); + } + + void assertFailure() throws InterruptedException { + assertTrue(completionLatch.await(30, TimeUnit.SECONDS)); + } + } +} diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java b/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java index ffdf79c0636b2..1ed6c02580f85 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java @@ -204,18 +204,21 @@ public DiscoveryNode getNode() { @Override public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options) - throws IOException, TransportException { - requests.put(requestId, Tuple.tuple(node, action)); - capturedRequests.add(new CapturedRequest(node, requestId, action, request)); + throws TransportException { + onSendRequest(requestId, action, request, node); } @Override - public void close() throws IOException { - + public void close() { } }; } + protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) { + requests.put(requestId, Tuple.tuple(node, action)); + capturedRequests.add(new CapturedRequest(node, requestId, action, request)); + } + @Override public TransportStats getStats() { throw new UnsupportedOperationException();