From ea5513f2cff601b448fbce807f17a3ea857ec79f Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Thu, 11 Jul 2019 14:02:55 +0200 Subject: [PATCH] Make NodeConnectionsService non-blocking (#44211) With connection management now being non-blocking, we can make NodeConnectionsService avoid the use of MANAGEMENT threads that are blocked during the connection attempts. I had to fiddle a bit with the tests as testPeriodicReconnection was using both the mock Threadpool from the DeterministicTaskQueue as well as the real ThreadPool initialized at the test class level, which resulted in races. --- .../cluster/NodeConnectionsService.java | 25 +++++++--- .../cluster/NodeConnectionsServiceTests.java | 48 ++++++++++++++----- 2 files changed, 55 insertions(+), 18 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java b/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java index f48413824d31f..b004af9d38ab6 100644 --- a/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java +++ b/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java @@ -297,14 +297,27 @@ private class ConnectionTarget { private final AtomicInteger consecutiveFailureCount = new AtomicInteger(); - private final Runnable connectActivity = () -> threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new AbstractRunnable() { + private final Runnable connectActivity = new AbstractRunnable() { + + final AbstractRunnable abstractRunnable = this; + @Override protected void doRun() { assert Thread.holdsLock(mutex) == false : "mutex unexpectedly held"; - transportService.connectToNode(discoveryNode); - consecutiveFailureCount.set(0); - logger.debug("connected to {}", discoveryNode); - onCompletion(ActivityType.CONNECTING, null, disconnectActivity); + transportService.connectToNode(discoveryNode, new ActionListener() { + @Override + public void onResponse(Void aVoid) { + assert Thread.holdsLock(mutex) == false : "mutex unexpectedly held"; + consecutiveFailureCount.set(0); + logger.debug("connected to {}", discoveryNode); + onCompletion(ActivityType.CONNECTING, null, disconnectActivity); + } + + @Override + public void onFailure(Exception e) { + abstractRunnable.onFailure(e); + } + }); } @Override @@ -322,7 +335,7 @@ public void onFailure(Exception e) { public String toString() { return "connect to " + discoveryNode; } - }); + }; private final Runnable disconnectActivity = new AbstractRunnable() { @Override diff --git a/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java index 377cbef84a4c6..e84b8391a7e6c 100644 --- a/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java @@ -131,7 +131,7 @@ public void testConnectAndDisconnect() throws Exception { service.connectToNodes(nodes, () -> future.onResponse(null)); future.actionGet(); if (isDisrupting == false) { - assertConnected(nodes); + assertConnected(transportService, nodes); } service.disconnectFromNodesExcept(nodes); @@ -169,6 +169,11 @@ public void testPeriodicReconnection() { final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(builder().put(NODE_NAME_SETTING.getKey(), "node").build(), random()); + MockTransport transport = new MockTransport(deterministicTaskQueue.getThreadPool()); + TestTransportService transportService = new TestTransportService(transport, deterministicTaskQueue.getThreadPool()); + transportService.start(); + transportService.acceptIncomingRequests(); + final NodeConnectionsService service = new NodeConnectionsService(settings.build(), deterministicTaskQueue.getThreadPool(), transportService); service.start(); @@ -211,7 +216,7 @@ public String toString() { transport.randomConnectionExceptions = false; logger.info("renewing connections"); runTasksUntil(deterministicTaskQueue, maxDisconnectionTime + reconnectIntervalMillis); - assertConnectedExactlyToNodes(targetNodes); + assertConnectedExactlyToNodes(transportService, targetNodes); } public void testOnlyBlocksOnConnectionsToNewNodes() throws Exception { @@ -314,11 +319,15 @@ private void ensureConnections(NodeConnectionsService service) { } private void assertConnectedExactlyToNodes(DiscoveryNodes discoveryNodes) { - assertConnected(discoveryNodes); + assertConnectedExactlyToNodes(transportService, discoveryNodes); + } + + private void assertConnectedExactlyToNodes(TransportService transportService, DiscoveryNodes discoveryNodes) { + assertConnected(transportService, discoveryNodes); assertThat(transportService.getConnectionManager().size(), equalTo(discoveryNodes.getSize())); } - private void assertConnected(Iterable nodes) { + private void assertConnected(TransportService transportService, Iterable nodes) { for (DiscoveryNode node : nodes) { assertTrue("not connected to " + node, transportService.nodeConnected(node)); } @@ -328,8 +337,9 @@ private void assertConnected(Iterable nodes) { @Before public void setUp() throws Exception { super.setUp(); - this.threadPool = new TestThreadPool(getClass().getName()); - this.transport = new MockTransport(); + ThreadPool threadPool = new TestThreadPool(getClass().getName()); + this.threadPool = threadPool; + this.transport = new MockTransport(threadPool); nodeConnectionBlocks = newConcurrentMap(); transportService = new TestTransportService(transport, threadPool); transportService.start(); @@ -361,21 +371,35 @@ public void handshake(Transport.Connection connection, long timeout, Predicate listener) throws ConnectTransportException { final CheckedRunnable connectionBlock = nodeConnectionBlocks.get(node); if (connectionBlock != null) { - try { - connectionBlock.run(); - } catch (Exception e) { - throw new AssertionError(e); - } + getThreadPool().generic().execute(() -> { + try { + connectionBlock.run(); + super.connectToNode(node, listener); + } catch (Exception e) { + throw new AssertionError(e); + } + }); + } else { + super.connectToNode(node, listener); } - super.connectToNode(node); } } private final class MockTransport implements Transport { private ResponseHandlers responseHandlers = new ResponseHandlers(); private volatile boolean randomConnectionExceptions = false; + private final ThreadPool threadPool; + + MockTransport(ThreadPool threadPool) { + this.threadPool = threadPool; + } @Override public void registerRequestHandler(RequestHandlerRegistry reg) {