From 7857109e37ec957b63fccec346abd3dac1f0c8b6 Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 23 Jan 2020 14:49:52 +0000 Subject: [PATCH] Log when probe succeeds but full connection fails (#51304) It is permitted for nodes to accept transport connections at addresses other than their publish address, which allows a good deal of flexibility when configuring discovery. However, it is not unusual for users to misconfigure nodes to pick a publish address which is inaccessible to other nodes. We see this happen a lot if the nodes are on different networks separated by a proxy, or if the nodes are running in Docker with the wrong kind of network config. In this case we offer no useful feedback to the user unless they enable TRACE-level logs. It's particularly tricky to diagnose because if we test connectivity between the nodes (using their discovery addresses) then all will appear well. This commit adds a WARN-level log if this kind of misconfiguration is detected: the probe connection has succeeded (to indicate that we are really talking to a healthy Elasticsearch node) but the followup connection attempt fails. It also tidies up some loose ends in `HandshakingTransportAddressConnector`, removing some TODOs that need not be completed, and registering its accidentally-unregistered timeout settings. --- .../common/settings/ClusterSettings.java | 3 + .../HandshakingTransportAddressConnector.java | 16 +++- ...shakingTransportAddressConnectorTests.java | 80 +++++++++++++++++-- 3 files changed, 87 insertions(+), 12 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 37b7ccdf05414..f9adabc94d749 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -69,6 +69,7 @@ import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.discovery.DiscoverySettings; +import org.elasticsearch.discovery.HandshakingTransportAddressConnector; import org.elasticsearch.discovery.PeerFinder; import org.elasticsearch.discovery.SeedHostsResolver; import org.elasticsearch.discovery.SettingsBasedSeedHostsProvider; @@ -529,6 +530,8 @@ public void apply(Settings value, Settings current, Settings previous) { ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING, ClusterBootstrapService.UNCONFIGURED_BOOTSTRAP_TIMEOUT_SETTING, LagDetector.CLUSTER_FOLLOWER_LAG_TIMEOUT_SETTING, + HandshakingTransportAddressConnector.PROBE_CONNECT_TIMEOUT_SETTING, + HandshakingTransportAddressConnector.PROBE_HANDSHAKE_TIMEOUT_SETTING, DiscoveryUpgradeService.BWC_PING_TIMEOUT_SETTING, DiscoveryUpgradeService.ENABLE_UNSAFE_BOOTSTRAPPING_ON_UPGRADE_SETTING))); diff --git a/server/src/main/java/org/elasticsearch/discovery/HandshakingTransportAddressConnector.java b/server/src/main/java/org/elasticsearch/discovery/HandshakingTransportAddressConnector.java index a9f468f0b140a..0c36c22c4d970 100644 --- a/server/src/main/java/org/elasticsearch/discovery/HandshakingTransportAddressConnector.java +++ b/server/src/main/java/org/elasticsearch/discovery/HandshakingTransportAddressConnector.java @@ -74,7 +74,8 @@ public void connectToRemoteMasterNode(TransportAddress transportAddress, ActionL @Override protected void doRun() { - // TODO if transportService is already connected to this address then skip the handshaking + // We could skip this if the transportService were already connected to the given address, but the savings would be minimal + // so we open a new connection anyway. final DiscoveryNode targetNode = new DiscoveryNode("", transportAddress.toString(), UUIDs.randomBase64UUID(Randomness.get()), // generated deterministically for reproducible tests @@ -101,21 +102,28 @@ protected void innerOnResponse(DiscoveryNode remoteNode) { IOUtils.closeWhileHandlingException(connection); if (remoteNode.equals(transportService.getLocalNode())) { - // TODO cache this result for some time? forever? listener.onFailure(new ConnectTransportException(remoteNode, "local node found")); } else if (remoteNode.isMasterNode() == false) { - // TODO cache this result for some time? listener.onFailure(new ConnectTransportException(remoteNode, "non-master-eligible node found")); } else { transportService.connectToNode(remoteNode, new ActionListener() { @Override public void onResponse(Void ignored) { - logger.trace("[{}] full connection successful: {}", thisConnectionAttempt, remoteNode); + logger.trace("[{}] completed full connection with [{}]", + thisConnectionAttempt, remoteNode); listener.onResponse(remoteNode); } @Override public void onFailure(Exception e) { + // we opened a connection and successfully performed a handshake, so we're definitely + // talking to a master-eligible node with a matching cluster name and a good version, + // but the attempt to open a full connection to its publish address failed; a common + // reason is that the remote node is listening on 0.0.0.0 but has made an inappropriate + // choice for its publish address. + logger.warn(new ParameterizedMessage( + "[{}] completed handshake with [{}] but followup connection failed", + thisConnectionAttempt, remoteNode), e); listener.onFailure(e); } }); diff --git a/server/src/test/java/org/elasticsearch/discovery/HandshakingTransportAddressConnectorTests.java b/server/src/test/java/org/elasticsearch/discovery/HandshakingTransportAddressConnectorTests.java index d697cd5468620..5a83c5555949d 100644 --- a/server/src/test/java/org/elasticsearch/discovery/HandshakingTransportAddressConnectorTests.java +++ b/server/src/test/java/org/elasticsearch/discovery/HandshakingTransportAddressConnectorTests.java @@ -19,16 +19,27 @@ package org.elasticsearch.discovery; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.lucene.util.SetOnce; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.MockLogAppender; +import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.transport.MockTransport; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.ConnectTransportException; +import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService.HandshakeResponse; @@ -44,10 +55,13 @@ import static org.elasticsearch.discovery.HandshakingTransportAddressConnector.PROBE_HANDSHAKE_TIMEOUT_SETTING; import static org.elasticsearch.node.Node.NODE_NAME_SETTING; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.oneOf; public class HandshakingTransportAddressConnectorTests extends ESTestCase { private DiscoveryNode remoteNode; + private TransportAddress discoveryAddress; private TransportService transportService; private ThreadPool threadPool; private String remoteClusterName; @@ -55,6 +69,8 @@ public class HandshakingTransportAddressConnectorTests extends ESTestCase { private DiscoveryNode localNode; private boolean dropHandshake; + @Nullable // unless we want the full connection to fail + private TransportException fullConnectionFailure; @Before public void startServices() { @@ -66,17 +82,24 @@ public void startServices() { threadPool = new TestThreadPool("node", settings); remoteNode = null; + discoveryAddress = null; remoteClusterName = null; dropHandshake = false; + fullConnectionFailure = null; final MockTransport mockTransport = new MockTransport() { @Override protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) { super.onSendRequest(requestId, action, request, node); assertThat(action, equalTo(TransportService.HANDSHAKE_ACTION_NAME)); - assertEquals(remoteNode.getAddress(), node.getAddress()); + assertThat(discoveryAddress, notNullValue()); + assertThat(node.getAddress(), oneOf(discoveryAddress, remoteNode.getAddress())); if (dropHandshake == false) { - handleResponse(requestId, new HandshakeResponse(remoteNode, new ClusterName(remoteClusterName), Version.CURRENT)); + if (fullConnectionFailure != null && node.getAddress().equals(remoteNode.getAddress())) { + handleError(requestId, fullConnectionFailure); + } else { + handleResponse(requestId, new HandshakeResponse(remoteNode, new ClusterName(remoteClusterName), Version.CURRENT)); + } } } }; @@ -91,7 +114,7 @@ protected void onSendRequest(long requestId, String action, TransportRequest req } @After - public void stopServices() throws InterruptedException { + public void stopServices() { transportService.stop(); terminate(threadPool); } @@ -102,8 +125,9 @@ public void testConnectsToMasterNode() throws InterruptedException { remoteNode = new DiscoveryNode("remote-node", buildNewFakeTransportAddress(), Version.CURRENT); remoteClusterName = "local-cluster"; + discoveryAddress = getDiscoveryAddress(); - handshakingTransportAddressConnector.connectToRemoteMasterNode(remoteNode.getAddress(), new ActionListener() { + handshakingTransportAddressConnector.connectToRemoteMasterNode(discoveryAddress, new ActionListener() { @Override public void onResponse(DiscoveryNode discoveryNode) { receivedNode.set(discoveryNode); @@ -120,44 +144,84 @@ public void onFailure(Exception e) { assertEquals(remoteNode, receivedNode.get()); } + @TestLogging(reason="ensure logging happens", value="org.elasticsearch.discovery.HandshakingTransportAddressConnector:INFO") + public void testLogsFullConnectionFailureAfterSuccessfulHandshake() throws Exception { + + remoteNode = new DiscoveryNode("remote-node", buildNewFakeTransportAddress(), Version.CURRENT); + remoteClusterName = "local-cluster"; + discoveryAddress = buildNewFakeTransportAddress(); + + fullConnectionFailure = new ConnectTransportException(remoteNode, "simulated", new ElasticsearchException("root cause")); + + FailureListener failureListener = new FailureListener(); + + MockLogAppender mockAppender = new MockLogAppender(); + mockAppender.start(); + mockAppender.addExpectation( + new MockLogAppender.SeenEventExpectation( + "message", + HandshakingTransportAddressConnector.class.getCanonicalName(), + Level.WARN, + "*completed handshake with [*] but followup connection failed*")); + Logger targetLogger = LogManager.getLogger(HandshakingTransportAddressConnector.class); + Loggers.addAppender(targetLogger, mockAppender); + + try { + handshakingTransportAddressConnector.connectToRemoteMasterNode(discoveryAddress, failureListener); + failureListener.assertFailure(); + mockAppender.assertAllExpectationsMatched(); + } finally { + Loggers.removeAppender(targetLogger, mockAppender); + mockAppender.stop(); + } + } + public void testDoesNotConnectToNonMasterNode() throws InterruptedException { remoteNode = new DiscoveryNode("remote-node", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); + discoveryAddress = getDiscoveryAddress(); remoteClusterName = "local-cluster"; FailureListener failureListener = new FailureListener(); - handshakingTransportAddressConnector.connectToRemoteMasterNode(remoteNode.getAddress(), failureListener); + handshakingTransportAddressConnector.connectToRemoteMasterNode(discoveryAddress, failureListener); failureListener.assertFailure(); } public void testDoesNotConnectToLocalNode() throws Exception { remoteNode = localNode; + discoveryAddress = getDiscoveryAddress(); remoteClusterName = "local-cluster"; FailureListener failureListener = new FailureListener(); - handshakingTransportAddressConnector.connectToRemoteMasterNode(remoteNode.getAddress(), failureListener); + handshakingTransportAddressConnector.connectToRemoteMasterNode(discoveryAddress, failureListener); failureListener.assertFailure(); } public void testDoesNotConnectToDifferentCluster() throws InterruptedException { remoteNode = new DiscoveryNode("remote-node", buildNewFakeTransportAddress(), Version.CURRENT); + discoveryAddress = getDiscoveryAddress(); remoteClusterName = "another-cluster"; FailureListener failureListener = new FailureListener(); - handshakingTransportAddressConnector.connectToRemoteMasterNode(remoteNode.getAddress(), failureListener); + handshakingTransportAddressConnector.connectToRemoteMasterNode(discoveryAddress, failureListener); failureListener.assertFailure(); } public void testHandshakeTimesOut() throws InterruptedException { remoteNode = new DiscoveryNode("remote-node", buildNewFakeTransportAddress(), Version.CURRENT); + discoveryAddress = getDiscoveryAddress(); remoteClusterName = "local-cluster"; dropHandshake = true; FailureListener failureListener = new FailureListener(); - handshakingTransportAddressConnector.connectToRemoteMasterNode(remoteNode.getAddress(), failureListener); + handshakingTransportAddressConnector.connectToRemoteMasterNode(discoveryAddress, failureListener); Thread.sleep(PROBE_HANDSHAKE_TIMEOUT_SETTING.get(Settings.EMPTY).millis()); failureListener.assertFailure(); } + private TransportAddress getDiscoveryAddress() { + return randomBoolean() ? remoteNode.getAddress() : buildNewFakeTransportAddress(); + } + private class FailureListener implements ActionListener { final CountDownLatch completionLatch = new CountDownLatch(1);