From 1ff8d6e86aa594093150b5a59965fd234b155a1e Mon Sep 17 00:00:00 2001 From: Armin Date: Mon, 25 Sep 2017 20:33:42 +0200 Subject: [PATCH] #26701 Added option to RST instead of FIN to TcpTransport#closeChannels --- .../org/elasticsearch/transport/TcpTransport.java | 13 +++++++------ .../elasticsearch/transport/TCPTransportTests.java | 2 +- .../transport/netty4/Netty4Transport.java | 7 ++++++- .../elasticsearch/transport/MockTcpTransport.java | 12 +++++++++--- .../elasticsearch/transport/nio/NioTransport.java | 9 +++++++-- 5 files changed, 30 insertions(+), 13 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java index 1af4f101e04a2..e9de9aaa3c819 100644 --- a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -442,7 +442,7 @@ public Channel channel(TransportRequestOptions.Type type) { public void close() throws IOException { if (closed.compareAndSet(false, true)) { try { - closeChannels(Arrays.stream(channels).filter(Objects::nonNull).collect(Collectors.toList()), false); + closeChannels(Arrays.stream(channels).filter(Objects::nonNull).collect(Collectors.toList()), false, true); } finally { transportService.onConnectionClosed(this); } @@ -640,7 +640,7 @@ private void disconnectFromNodeCloseAndNotify(DiscoveryNode node, NodeChannels n protected final void closeChannelWhileHandlingExceptions(final Channel channel) { if (isOpen(channel)) { try { - closeChannels(Collections.singletonList(channel), false); + closeChannels(Collections.singletonList(channel), false, false); } catch (IOException e) { logger.warn("failed to close channel", e); } @@ -902,7 +902,7 @@ protected final void doStop() { // first stop to accept any incoming connections so nobody can connect to this transport for (Map.Entry> entry : serverChannels.entrySet()) { try { - closeChannels(entry.getValue(), true); + closeChannels(entry.getValue(), true, true); } catch (Exception e) { logger.debug( (Supplier) () -> new ParameterizedMessage( @@ -975,7 +975,7 @@ protected void onException(Channel channel, Exception e) { @Override protected void innerInnerOnResponse(Channel channel) { try { - closeChannels(Collections.singletonList(channel), false); + closeChannels(Collections.singletonList(channel), false, false); } catch (IOException e1) { logger.debug("failed to close httpOnTransport channel", e1); } @@ -984,7 +984,7 @@ protected void innerInnerOnResponse(Channel channel) { @Override protected void innerOnFailure(Exception e) { try { - closeChannels(Collections.singletonList(channel), false); + closeChannels(Collections.singletonList(channel), false, false); } catch (IOException e1) { e.addSuppressed(e1); logger.debug("failed to close httpOnTransport channel", e1); @@ -1021,8 +1021,9 @@ protected void innerOnFailure(Exception e) { * * @param channels the channels to close * @param blocking whether the channels should be closed synchronously + * @param closingTransport whether we abort the connection on RST instead of FIN */ - protected abstract void closeChannels(List channels, boolean blocking) throws IOException; + protected abstract void closeChannels(List channels, boolean blocking, boolean closingTransport) throws IOException; /** * Sends message to channel. The listener's onResponse method will be called when the send is complete unless an exception diff --git a/core/src/test/java/org/elasticsearch/transport/TCPTransportTests.java b/core/src/test/java/org/elasticsearch/transport/TCPTransportTests.java index c386b2865afa3..55457cc8ae431 100644 --- a/core/src/test/java/org/elasticsearch/transport/TCPTransportTests.java +++ b/core/src/test/java/org/elasticsearch/transport/TCPTransportTests.java @@ -191,7 +191,7 @@ protected Object bind(String name, InetSocketAddress address) throws IOException } @Override - protected void closeChannels(List channel, boolean blocking) throws IOException { + protected void closeChannels(List channel, boolean blocking, boolean closingTransport) throws IOException { } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java index d92066e1fcc63..a196976dc125e 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java @@ -331,7 +331,12 @@ protected void sendMessage(Channel channel, BytesReference reference, ActionList } @Override - protected void closeChannels(final List channels, boolean blocking) throws IOException { + protected void closeChannels(final List channels, boolean blocking, boolean closingTransport) throws IOException { + if (closingTransport) { + for (Channel channel : channels) { + channel.config().setOption(ChannelOption.SO_LINGER, 0); + } + } if (blocking) { Netty4Utils.closeChannels(channels); } else { diff --git a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java index 19ead898997db..085469059bf61 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java @@ -242,8 +242,15 @@ protected void sendMessage(MockChannel mockChannel, BytesReference reference, Ac } @Override - protected void closeChannels(List channel, boolean blocking) throws IOException { - IOUtils.close(channel); + protected void closeChannels(List channels, boolean blocking, boolean closingTransport) throws IOException { + if (closingTransport) { + for (MockChannel channel : channels) { + if (channel.activeChannel != null) { + channel.activeChannel.setSoLinger(true, 0); + } + } + } + IOUtils.close(channels); } @Override @@ -303,7 +310,6 @@ public void accept(Executor executor) throws IOException { MockChannel incomingChannel = null; try { configureSocket(incomingSocket); - incomingSocket.setSoLinger(true, 0); synchronized (this) { if (isOpen.get()) { incomingChannel = new MockChannel(incomingSocket, diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java index 5a9dba29f5e57..b22feb5697695 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java @@ -19,6 +19,7 @@ package org.elasticsearch.transport.nio; +import java.net.StandardSocketOptions; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; @@ -28,7 +29,6 @@ import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.indices.breaker.CircuitBreakerService; @@ -99,7 +99,12 @@ protected NioServerSocketChannel bind(String name, InetSocketAddress address) th } @Override - protected void closeChannels(List channels, boolean blocking) throws IOException { + protected void closeChannels(List channels, boolean blocking, boolean closingTransport) throws IOException { + if (closingTransport) { + for (NioChannel channel : channels) { + channel.getRawChannel().setOption(StandardSocketOptions.SO_LINGER, 0); + } + } ArrayList futures = new ArrayList<>(channels.size()); for (final NioChannel channel : channels) { if (channel != null && channel.isOpen()) {