Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make NodeConnectionsService non-blocking #44211

Merged
merged 1 commit into from
Jul 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -297,14 +297,27 @@ private class ConnectionTarget {

private final AtomicInteger consecutiveFailureCount = new AtomicInteger();

private final Runnable connectActivity = () -> threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new AbstractRunnable() {
private final Runnable connectActivity = new AbstractRunnable() {

final AbstractRunnable abstractRunnable = this;

@Override
protected void doRun() {
assert Thread.holdsLock(mutex) == false : "mutex unexpectedly held";
transportService.connectToNode(discoveryNode);
consecutiveFailureCount.set(0);
logger.debug("connected to {}", discoveryNode);
onCompletion(ActivityType.CONNECTING, null, disconnectActivity);
transportService.connectToNode(discoveryNode, new ActionListener<Void>() {
@Override
public void onResponse(Void aVoid) {
assert Thread.holdsLock(mutex) == false : "mutex unexpectedly held";
consecutiveFailureCount.set(0);
logger.debug("connected to {}", discoveryNode);
onCompletion(ActivityType.CONNECTING, null, disconnectActivity);
}

@Override
public void onFailure(Exception e) {
abstractRunnable.onFailure(e);
}
});
}

@Override
Expand All @@ -322,7 +335,7 @@ public void onFailure(Exception e) {
public String toString() {
return "connect to " + discoveryNode;
}
});
};

private final Runnable disconnectActivity = new AbstractRunnable() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public void testConnectAndDisconnect() throws Exception {
service.connectToNodes(nodes, () -> future.onResponse(null));
future.actionGet();
if (isDisrupting == false) {
assertConnected(nodes);
assertConnected(transportService, nodes);
}
service.disconnectFromNodesExcept(nodes);

Expand Down Expand Up @@ -169,6 +169,11 @@ public void testPeriodicReconnection() {
final DeterministicTaskQueue deterministicTaskQueue
= new DeterministicTaskQueue(builder().put(NODE_NAME_SETTING.getKey(), "node").build(), random());

MockTransport transport = new MockTransport(deterministicTaskQueue.getThreadPool());
TestTransportService transportService = new TestTransportService(transport, deterministicTaskQueue.getThreadPool());
transportService.start();
transportService.acceptIncomingRequests();

final NodeConnectionsService service
= new NodeConnectionsService(settings.build(), deterministicTaskQueue.getThreadPool(), transportService);
service.start();
Expand Down Expand Up @@ -211,7 +216,7 @@ public String toString() {
transport.randomConnectionExceptions = false;
logger.info("renewing connections");
runTasksUntil(deterministicTaskQueue, maxDisconnectionTime + reconnectIntervalMillis);
assertConnectedExactlyToNodes(targetNodes);
assertConnectedExactlyToNodes(transportService, targetNodes);
}

public void testOnlyBlocksOnConnectionsToNewNodes() throws Exception {
Expand Down Expand Up @@ -314,11 +319,15 @@ private void ensureConnections(NodeConnectionsService service) {
}

private void assertConnectedExactlyToNodes(DiscoveryNodes discoveryNodes) {
assertConnected(discoveryNodes);
assertConnectedExactlyToNodes(transportService, discoveryNodes);
}

private void assertConnectedExactlyToNodes(TransportService transportService, DiscoveryNodes discoveryNodes) {
assertConnected(transportService, discoveryNodes);
assertThat(transportService.getConnectionManager().size(), equalTo(discoveryNodes.getSize()));
}

private void assertConnected(Iterable<DiscoveryNode> nodes) {
private void assertConnected(TransportService transportService, Iterable<DiscoveryNode> nodes) {
for (DiscoveryNode node : nodes) {
assertTrue("not connected to " + node, transportService.nodeConnected(node));
}
Expand All @@ -328,8 +337,9 @@ private void assertConnected(Iterable<DiscoveryNode> nodes) {
@Before
public void setUp() throws Exception {
super.setUp();
this.threadPool = new TestThreadPool(getClass().getName());
this.transport = new MockTransport();
ThreadPool threadPool = new TestThreadPool(getClass().getName());
this.threadPool = threadPool;
this.transport = new MockTransport(threadPool);
nodeConnectionBlocks = newConcurrentMap();
transportService = new TestTransportService(transport, threadPool);
transportService.start();
Expand Down Expand Up @@ -361,21 +371,35 @@ public void handshake(Transport.Connection connection, long timeout, Predicate<C

@Override
public void connectToNode(DiscoveryNode node) throws ConnectTransportException {
throw new AssertionError("no blocking connect");
}

@Override
public void connectToNode(DiscoveryNode node, ActionListener<Void> listener) throws ConnectTransportException {
final CheckedRunnable<Exception> connectionBlock = nodeConnectionBlocks.get(node);
if (connectionBlock != null) {
try {
connectionBlock.run();
} catch (Exception e) {
throw new AssertionError(e);
}
getThreadPool().generic().execute(() -> {
try {
connectionBlock.run();
super.connectToNode(node, listener);
} catch (Exception e) {
throw new AssertionError(e);
}
});
} else {
super.connectToNode(node, listener);
}
super.connectToNode(node);
}
}

private final class MockTransport implements Transport {
private ResponseHandlers responseHandlers = new ResponseHandlers();
private volatile boolean randomConnectionExceptions = false;
private final ThreadPool threadPool;

MockTransport(ThreadPool threadPool) {
this.threadPool = threadPool;
}

@Override
public <Request extends TransportRequest> void registerRequestHandler(RequestHandlerRegistry<Request> reg) {
Expand Down