Skip to content

Commit

Permalink
Fix potential race during TcpTransport close (#39031)
Browse files Browse the repository at this point in the history
Fixed two potential causes for leaked threads during tests:
1. When adding a channel to serverChannels, we add it under a monitor
that we do not use when reading from it. This is potentially unsafe if
there is no other happens-before relationship ensuring the safety of
this.
2. Long-shot but if the thread pool was shutdown before entering this
code, we would silently forget about closing server channels so added
assert.

Strengthened the locking to ensure that once we stop the transport, no
new server channels can be made.

Relates to CI failure issue: #37543
  • Loading branch information
henningandersen authored Feb 18, 2019
1 parent c627559 commit 3d09087
Showing 1 changed file with 19 additions and 18 deletions.
37 changes: 19 additions & 18 deletions server/src/main/java/org/elasticsearch/transport/TcpTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -388,28 +388,28 @@ private InetSocketAddress bindToPort(final String name, final InetAddress hostAd
PortsRange portsRange = new PortsRange(port);
final AtomicReference<Exception> lastException = new AtomicReference<>();
final AtomicReference<InetSocketAddress> boundSocket = new AtomicReference<>();
boolean success = portsRange.iterate(portNumber -> {
try {
TcpServerChannel channel = bind(name, new InetSocketAddress(hostAddress, portNumber));
synchronized (serverChannels) {
List<TcpServerChannel> list = serverChannels.get(name);
if (list == null) {
list = new ArrayList<>();
serverChannels.put(name, list);
}
list.add(channel);
closeLock.writeLock().lock();
try {
if (lifecycle.initialized() == false && lifecycle.started() == false) {
throw new IllegalStateException("transport has been stopped");
}
boolean success = portsRange.iterate(portNumber -> {
try {
TcpServerChannel channel = bind(name, new InetSocketAddress(hostAddress, portNumber));
serverChannels.computeIfAbsent(name, k -> new ArrayList<>()).add(channel);
boundSocket.set(channel.getLocalAddress());
} catch (Exception e) {
lastException.set(e);
return false;
}
} catch (Exception e) {
lastException.set(e);
return false;
return true;
});
if (!success) {
throw new BindTransportException("Failed to bind to [" + port + "]", lastException.get());
}
return true;
});
if (!success) {
throw new BindTransportException("Failed to bind to [" + port + "]", lastException.get());
} finally {
closeLock.writeLock().unlock();
}

if (logger.isDebugEnabled()) {
logger.debug("Bound profile [{}] to address {{}}", name, NetworkAddress.format(boundSocket.get()));
}
Expand Down Expand Up @@ -553,6 +553,7 @@ protected final void doClose() {
protected final void doStop() {
final CountDownLatch latch = new CountDownLatch(1);
// make sure we run it on another thread than a possible IO handler thread
assert threadPool.generic().isShutdown() == false : "Must stop transport before terminating underlying threadpool";
threadPool.generic().execute(() -> {
closeLock.writeLock().lock();
try {
Expand Down

0 comments on commit 3d09087

Please sign in to comment.