Skip to content

Commit

Permalink
Make NodeConnectionsService non-blocking (#44211)
Browse files Browse the repository at this point in the history
With connection management now being non-blocking, we can make NodeConnectionsService
avoid the use of MANAGEMENT threads that are blocked during the connection attempts.

I had to fiddle a bit with the tests as testPeriodicReconnection was using both the mock Threadpool
from the DeterministicTaskQueue as well as the real ThreadPool initialized at the test class level,
which resulted in races.
  • Loading branch information
ywelsch authored Jul 11, 2019
1 parent 3bfabf8 commit fea50c2
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -297,14 +297,27 @@ private class ConnectionTarget {

private final AtomicInteger consecutiveFailureCount = new AtomicInteger();

private final Runnable connectActivity = () -> threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new AbstractRunnable() {
private final Runnable connectActivity = new AbstractRunnable() {

final AbstractRunnable abstractRunnable = this;

@Override
protected void doRun() {
assert Thread.holdsLock(mutex) == false : "mutex unexpectedly held";
transportService.connectToNode(discoveryNode);
consecutiveFailureCount.set(0);
logger.debug("connected to {}", discoveryNode);
onCompletion(ActivityType.CONNECTING, null, disconnectActivity);
transportService.connectToNode(discoveryNode, new ActionListener<Void>() {
@Override
public void onResponse(Void aVoid) {
assert Thread.holdsLock(mutex) == false : "mutex unexpectedly held";
consecutiveFailureCount.set(0);
logger.debug("connected to {}", discoveryNode);
onCompletion(ActivityType.CONNECTING, null, disconnectActivity);
}

@Override
public void onFailure(Exception e) {
abstractRunnable.onFailure(e);
}
});
}

@Override
Expand All @@ -322,7 +335,7 @@ public void onFailure(Exception e) {
public String toString() {
return "connect to " + discoveryNode;
}
});
};

private final Runnable disconnectActivity = new AbstractRunnable() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public void testConnectAndDisconnect() throws Exception {
service.connectToNodes(nodes, () -> future.onResponse(null));
future.actionGet();
if (isDisrupting == false) {
assertConnected(nodes);
assertConnected(transportService, nodes);
}
service.disconnectFromNodesExcept(nodes);

Expand Down Expand Up @@ -169,6 +169,11 @@ public void testPeriodicReconnection() {
final DeterministicTaskQueue deterministicTaskQueue
= new DeterministicTaskQueue(builder().put(NODE_NAME_SETTING.getKey(), "node").build(), random());

MockTransport transport = new MockTransport(deterministicTaskQueue.getThreadPool());
TestTransportService transportService = new TestTransportService(transport, deterministicTaskQueue.getThreadPool());
transportService.start();
transportService.acceptIncomingRequests();

final NodeConnectionsService service
= new NodeConnectionsService(settings.build(), deterministicTaskQueue.getThreadPool(), transportService);
service.start();
Expand Down Expand Up @@ -211,7 +216,7 @@ public String toString() {
transport.randomConnectionExceptions = false;
logger.info("renewing connections");
runTasksUntil(deterministicTaskQueue, maxDisconnectionTime + reconnectIntervalMillis);
assertConnectedExactlyToNodes(targetNodes);
assertConnectedExactlyToNodes(transportService, targetNodes);
}

public void testOnlyBlocksOnConnectionsToNewNodes() throws Exception {
Expand Down Expand Up @@ -314,11 +319,15 @@ private void ensureConnections(NodeConnectionsService service) {
}

private void assertConnectedExactlyToNodes(DiscoveryNodes discoveryNodes) {
assertConnected(discoveryNodes);
assertConnectedExactlyToNodes(transportService, discoveryNodes);
}

private void assertConnectedExactlyToNodes(TransportService transportService, DiscoveryNodes discoveryNodes) {
assertConnected(transportService, discoveryNodes);
assertThat(transportService.getConnectionManager().size(), equalTo(discoveryNodes.getSize()));
}

private void assertConnected(Iterable<DiscoveryNode> nodes) {
private void assertConnected(TransportService transportService, Iterable<DiscoveryNode> nodes) {
for (DiscoveryNode node : nodes) {
assertTrue("not connected to " + node, transportService.nodeConnected(node));
}
Expand All @@ -328,8 +337,9 @@ private void assertConnected(Iterable<DiscoveryNode> nodes) {
@Before
public void setUp() throws Exception {
super.setUp();
this.threadPool = new TestThreadPool(getClass().getName());
this.transport = new MockTransport();
ThreadPool threadPool = new TestThreadPool(getClass().getName());
this.threadPool = threadPool;
this.transport = new MockTransport(threadPool);
nodeConnectionBlocks = newConcurrentMap();
transportService = new TestTransportService(transport, threadPool);
transportService.start();
Expand Down Expand Up @@ -361,21 +371,35 @@ public void handshake(Transport.Connection connection, long timeout, Predicate<C

@Override
public void connectToNode(DiscoveryNode node) throws ConnectTransportException {
throw new AssertionError("no blocking connect");
}

@Override
public void connectToNode(DiscoveryNode node, ActionListener<Void> listener) throws ConnectTransportException {
final CheckedRunnable<Exception> connectionBlock = nodeConnectionBlocks.get(node);
if (connectionBlock != null) {
try {
connectionBlock.run();
} catch (Exception e) {
throw new AssertionError(e);
}
getThreadPool().generic().execute(() -> {
try {
connectionBlock.run();
super.connectToNode(node, listener);
} catch (Exception e) {
throw new AssertionError(e);
}
});
} else {
super.connectToNode(node, listener);
}
super.connectToNode(node);
}
}

private final class MockTransport implements Transport {
private ResponseHandlers responseHandlers = new ResponseHandlers();
private volatile boolean randomConnectionExceptions = false;
private final ThreadPool threadPool;

MockTransport(ThreadPool threadPool) {
this.threadPool = threadPool;
}

@Override
public <Request extends TransportRequest> void registerRequestHandler(RequestHandlerRegistry<Request> reg) {
Expand Down

0 comments on commit fea50c2

Please sign in to comment.