diff --git a/server/src/main/java/org/elasticsearch/transport/ClusterConnectionManager.java b/server/src/main/java/org/elasticsearch/transport/ClusterConnectionManager.java index 4b5da7dbbefd..415769596a40 100644 --- a/server/src/main/java/org/elasticsearch/transport/ClusterConnectionManager.java +++ b/server/src/main/java/org/elasticsearch/transport/ClusterConnectionManager.java @@ -123,6 +123,17 @@ public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfil currentListener.addListener(listener); + // It's possible that a connection completed, and the pendingConnections entry was removed, between the calls to + // connectedNodes.containsKey and pendingConnections.putIfAbsent above, so we check again to make sure we don't open a redundant + // extra connection to the node. We could _just_ check here, but checking up front skips the work to mark the connection as pending. + if (connectedNodes.containsKey(node)) { + ListenableFuture future = pendingConnections.remove(node); + assert future == currentListener : "Listener in pending map is different than the expected listener"; + connectingRefCounter.decRef(); + future.onResponse(null); + return; + } + final RunOnce releaseOnce = new RunOnce(connectingRefCounter::decRef); internalOpenConnection(node, resolvedProfile, ActionListener.wrap(conn -> { connectionValidator.validate(conn, resolvedProfile, ActionListener.wrap( @@ -130,6 +141,7 @@ public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfil assert Transports.assertNotTransportThread("connection validator success"); try { if (connectedNodes.putIfAbsent(node, conn) != null) { + assert false : "redundant conection to " + node; logger.debug("existing connection to node [{}], closing new redundant connection", node); IOUtils.closeWhileHandlingException(conn); } else {