From 3d090879c6a3291dadf7cf9a2ffdbc90def2e239 Mon Sep 17 00:00:00 2001 From: Henning Andersen <33268011+henningandersen@users.noreply.github.com> Date: Mon, 18 Feb 2019 18:10:00 +0100 Subject: [PATCH] Fix potential race during TcpTransport close (#39031) Fixed two potential causes for leaked threads during tests: 1. When adding a channel to serverChannels, we add it under a monitor that we do not use when reading from it. This is potentially unsafe if there is no other happens-before relationship ensuring the safety of this. 2. Long-shot but if the thread pool was shutdown before entering this code, we would silently forget about closing server channels so added assert. Strengthened the locking to ensure that once we stop the transport, no new server channels can be made. Relates to CI failure issue: #37543 --- .../elasticsearch/transport/TcpTransport.java | 37 ++++++++++--------- 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java index 2ff5ae1583e37..d5a524105dd01 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -388,28 +388,28 @@ private InetSocketAddress bindToPort(final String name, final InetAddress hostAd PortsRange portsRange = new PortsRange(port); final AtomicReference lastException = new AtomicReference<>(); final AtomicReference boundSocket = new AtomicReference<>(); - boolean success = portsRange.iterate(portNumber -> { - try { - TcpServerChannel channel = bind(name, new InetSocketAddress(hostAddress, portNumber)); - synchronized (serverChannels) { - List list = serverChannels.get(name); - if (list == null) { - list = new ArrayList<>(); - serverChannels.put(name, list); - } - list.add(channel); + closeLock.writeLock().lock(); + try { + if (lifecycle.initialized() == false && lifecycle.started() == false) { + throw new IllegalStateException("transport has been stopped"); + } + boolean success = portsRange.iterate(portNumber -> { + try { + TcpServerChannel channel = bind(name, new InetSocketAddress(hostAddress, portNumber)); + serverChannels.computeIfAbsent(name, k -> new ArrayList<>()).add(channel); boundSocket.set(channel.getLocalAddress()); + } catch (Exception e) { + lastException.set(e); + return false; } - } catch (Exception e) { - lastException.set(e); - return false; + return true; + }); + if (!success) { + throw new BindTransportException("Failed to bind to [" + port + "]", lastException.get()); } - return true; - }); - if (!success) { - throw new BindTransportException("Failed to bind to [" + port + "]", lastException.get()); + } finally { + closeLock.writeLock().unlock(); } - if (logger.isDebugEnabled()) { logger.debug("Bound profile [{}] to address {{}}", name, NetworkAddress.format(boundSocket.get())); } @@ -553,6 +553,7 @@ protected final void doClose() { protected final void doStop() { final CountDownLatch latch = new CountDownLatch(1); // make sure we run it on another thread than a possible IO handler thread + assert threadPool.generic().isShutdown() == false : "Must stop transport before terminating underlying threadpool"; threadPool.generic().execute(() -> { closeLock.writeLock().lock(); try {