Skip to content

Commit

Permalink
elastic#26701 Added option to RST instead of FIN to TcpTransport#clo…
Browse files Browse the repository at this point in the history
…seChannels
  • Loading branch information
original-brownbear committed Sep 25, 2017
1 parent f79f008 commit 1ff8d6e
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 13 deletions.
13 changes: 7 additions & 6 deletions core/src/main/java/org/elasticsearch/transport/TcpTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ public Channel channel(TransportRequestOptions.Type type) {
public void close() throws IOException {
if (closed.compareAndSet(false, true)) {
try {
closeChannels(Arrays.stream(channels).filter(Objects::nonNull).collect(Collectors.toList()), false);
closeChannels(Arrays.stream(channels).filter(Objects::nonNull).collect(Collectors.toList()), false, true);
} finally {
transportService.onConnectionClosed(this);
}
Expand Down Expand Up @@ -640,7 +640,7 @@ private void disconnectFromNodeCloseAndNotify(DiscoveryNode node, NodeChannels n
protected final void closeChannelWhileHandlingExceptions(final Channel channel) {
if (isOpen(channel)) {
try {
closeChannels(Collections.singletonList(channel), false);
closeChannels(Collections.singletonList(channel), false, false);
} catch (IOException e) {
logger.warn("failed to close channel", e);
}
Expand Down Expand Up @@ -902,7 +902,7 @@ protected final void doStop() {
// first stop to accept any incoming connections so nobody can connect to this transport
for (Map.Entry<String, List<Channel>> entry : serverChannels.entrySet()) {
try {
closeChannels(entry.getValue(), true);
closeChannels(entry.getValue(), true, true);
} catch (Exception e) {
logger.debug(
(Supplier<?>) () -> new ParameterizedMessage(
Expand Down Expand Up @@ -975,7 +975,7 @@ protected void onException(Channel channel, Exception e) {
@Override
protected void innerInnerOnResponse(Channel channel) {
try {
closeChannels(Collections.singletonList(channel), false);
closeChannels(Collections.singletonList(channel), false, false);
} catch (IOException e1) {
logger.debug("failed to close httpOnTransport channel", e1);
}
Expand All @@ -984,7 +984,7 @@ protected void innerInnerOnResponse(Channel channel) {
@Override
protected void innerOnFailure(Exception e) {
try {
closeChannels(Collections.singletonList(channel), false);
closeChannels(Collections.singletonList(channel), false, false);
} catch (IOException e1) {
e.addSuppressed(e1);
logger.debug("failed to close httpOnTransport channel", e1);
Expand Down Expand Up @@ -1021,8 +1021,9 @@ protected void innerOnFailure(Exception e) {
*
* @param channels the channels to close
* @param blocking whether the channels should be closed synchronously
* @param closingTransport whether we abort the connection on RST instead of FIN
*/
protected abstract void closeChannels(List<Channel> channels, boolean blocking) throws IOException;
protected abstract void closeChannels(List<Channel> channels, boolean blocking, boolean closingTransport) throws IOException;

/**
* Sends message to channel. The listener's onResponse method will be called when the send is complete unless an exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ protected Object bind(String name, InetSocketAddress address) throws IOException
}

@Override
protected void closeChannels(List channel, boolean blocking) throws IOException {
protected void closeChannels(List channel, boolean blocking, boolean closingTransport) throws IOException {

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,12 @@ protected void sendMessage(Channel channel, BytesReference reference, ActionList
}

@Override
protected void closeChannels(final List<Channel> channels, boolean blocking) throws IOException {
protected void closeChannels(final List<Channel> channels, boolean blocking, boolean closingTransport) throws IOException {
if (closingTransport) {
for (Channel channel : channels) {
channel.config().setOption(ChannelOption.SO_LINGER, 0);
}
}
if (blocking) {
Netty4Utils.closeChannels(channels);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,15 @@ protected void sendMessage(MockChannel mockChannel, BytesReference reference, Ac
}

@Override
protected void closeChannels(List<MockChannel> channel, boolean blocking) throws IOException {
IOUtils.close(channel);
protected void closeChannels(List<MockChannel> channels, boolean blocking, boolean closingTransport) throws IOException {
if (closingTransport) {
for (MockChannel channel : channels) {
if (channel.activeChannel != null) {
channel.activeChannel.setSoLinger(true, 0);
}
}
}
IOUtils.close(channels);
}

@Override
Expand Down Expand Up @@ -303,7 +310,6 @@ public void accept(Executor executor) throws IOException {
MockChannel incomingChannel = null;
try {
configureSocket(incomingSocket);
incomingSocket.setSoLinger(true, 0);
synchronized (this) {
if (isOpen.get()) {
incomingChannel = new MockChannel(incomingSocket,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.transport.nio;

import java.net.StandardSocketOptions;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
Expand All @@ -28,7 +29,6 @@
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
Expand Down Expand Up @@ -99,7 +99,12 @@ protected NioServerSocketChannel bind(String name, InetSocketAddress address) th
}

@Override
protected void closeChannels(List<NioChannel> channels, boolean blocking) throws IOException {
protected void closeChannels(List<NioChannel> channels, boolean blocking, boolean closingTransport) throws IOException {
if (closingTransport) {
for (NioChannel channel : channels) {
channel.getRawChannel().setOption(StandardSocketOptions.SO_LINGER, 0);
}
}
ArrayList<CloseFuture> futures = new ArrayList<>(channels.size());
for (final NioChannel channel : channels) {
if (channel != null && channel.isOpen()) {
Expand Down

0 comments on commit 1ff8d6e

Please sign in to comment.